diff options
author | Aidan Skinner <aidan@apache.org> | 2008-05-16 15:20:52 +0000 |
---|---|---|
committer | Aidan Skinner <aidan@apache.org> | 2008-05-16 15:20:52 +0000 |
commit | 2360f1b87fbe721f71e6a63065c489ca552c9b0c (patch) | |
tree | 91567ae0178870d59293380c79ba029569c8ac91 | |
parent | 716bf9a824491e93f39bc79ccd0caf9fbb36f2ee (diff) | |
download | qpid-python-2360f1b87fbe721f71e6a63065c489ca552c9b0c.tar.gz |
Merged revisions 653416 via svnmerge from
https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.x
........
r653416 | aidan | 2008-05-05 11:24:50 +0100 (Mon, 05 May 2008) | 1 line
QPID-1019 prevent messages being dequeued unecessarily, from rgodfrey
........
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1.x@657101 13f79535-47bb-0310-9956-ffa450edef68
3 files changed, 20 insertions, 7 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 17b4fa5d65..1df06a4911 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -469,7 +469,7 @@ public class AMQChannel synchronized (_unacknowledgedMessageMap.getLock()) { - _unacknowledgedMessageMap.add(deliveryTag, new UnacknowledgedMessage(entry, consumerTag, deliveryTag)); + _unacknowledgedMessageMap.add(deliveryTag, new UnacknowledgedMessage(entry, consumerTag, deliveryTag,_unacknowledgedMessageMap)); checkSuspension(); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java index df7cecc940..0112d3b388 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java @@ -34,13 +34,18 @@ public class UnacknowledgedMessage public final long deliveryTag; private boolean _queueDeleted; + private final UnacknowledgedMessageMap _unacknowledgeMessageMap; - public UnacknowledgedMessage(QueueEntry entry, AMQShortString consumerTag, long deliveryTag) + public UnacknowledgedMessage(QueueEntry entry, + AMQShortString consumerTag, + long deliveryTag, + final UnacknowledgedMessageMap unacknowledgedMessageMap) { this.entry = entry; this.consumerTag = consumerTag; this.deliveryTag = deliveryTag; + _unacknowledgeMessageMap = unacknowledgedMessageMap; } public String toString() @@ -60,12 +65,20 @@ public class UnacknowledgedMessage public void discard(StoreContext storeContext) throws AMQException { - if (entry.getQueue() != null) + synchronized(_unacknowledgeMessageMap) { - entry.getQueue().dequeue(storeContext, entry); + if(_unacknowledgeMessageMap.contains(deliveryTag)) + { + + if (entry.getQueue() != null) + { + entry.getQueue().dequeue(storeContext, entry); + } + //if the queue is null then the message is waiting to be acked, but has been removed. + entry.getMessage().decrementReference(storeContext); + } } - //if the queue is null then the message is waiting to be acked, but has been removed. - entry.getMessage().decrementReference(storeContext); + } public AMQMessage getMessage() diff --git a/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java b/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java index 42d9cccb4f..5fa3b6403d 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java @@ -139,7 +139,7 @@ public class TxAckTest extends TestCase }; TestMessage message = new TestMessage(deliveryTag, i, info, txnContext); - _map.add(deliveryTag, new UnacknowledgedMessage(new QueueEntry(null,message), null, deliveryTag)); + _map.add(deliveryTag, new UnacknowledgedMessage(new QueueEntry(null,message), null, deliveryTag, _map)); } _acked = acked; _unacked = unacked; |