From 2360f1b87fbe721f71e6a63065c489ca552c9b0c Mon Sep 17 00:00:00 2001 From: Aidan Skinner Date: Fri, 16 May 2008 15:20:52 +0000 Subject: Merged revisions 653416 via svnmerge from https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.x ........ r653416 | aidan | 2008-05-05 11:24:50 +0100 (Mon, 05 May 2008) | 1 line QPID-1019 prevent messages being dequeued unecessarily, from rgodfrey ........ git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1.x@657101 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/server/AMQChannel.java | 2 +- .../qpid/server/ack/UnacknowledgedMessage.java | 23 +++++++++++++++++----- .../java/org/apache/qpid/server/ack/TxAckTest.java | 2 +- 3 files changed, 20 insertions(+), 7 deletions(-) diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 17b4fa5d65..1df06a4911 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -469,7 +469,7 @@ public class AMQChannel synchronized (_unacknowledgedMessageMap.getLock()) { - _unacknowledgedMessageMap.add(deliveryTag, new UnacknowledgedMessage(entry, consumerTag, deliveryTag)); + _unacknowledgedMessageMap.add(deliveryTag, new UnacknowledgedMessage(entry, consumerTag, deliveryTag,_unacknowledgedMessageMap)); checkSuspension(); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java index df7cecc940..0112d3b388 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java @@ -34,13 +34,18 @@ public class UnacknowledgedMessage public final long deliveryTag; private boolean _queueDeleted; + private final UnacknowledgedMessageMap _unacknowledgeMessageMap; - public UnacknowledgedMessage(QueueEntry entry, AMQShortString consumerTag, long deliveryTag) + public UnacknowledgedMessage(QueueEntry entry, + AMQShortString consumerTag, + long deliveryTag, + final UnacknowledgedMessageMap unacknowledgedMessageMap) { this.entry = entry; this.consumerTag = consumerTag; this.deliveryTag = deliveryTag; + _unacknowledgeMessageMap = unacknowledgedMessageMap; } public String toString() @@ -60,12 +65,20 @@ public class UnacknowledgedMessage public void discard(StoreContext storeContext) throws AMQException { - if (entry.getQueue() != null) + synchronized(_unacknowledgeMessageMap) { - entry.getQueue().dequeue(storeContext, entry); + if(_unacknowledgeMessageMap.contains(deliveryTag)) + { + + if (entry.getQueue() != null) + { + entry.getQueue().dequeue(storeContext, entry); + } + //if the queue is null then the message is waiting to be acked, but has been removed. + entry.getMessage().decrementReference(storeContext); + } } - //if the queue is null then the message is waiting to be acked, but has been removed. - entry.getMessage().decrementReference(storeContext); + } public AMQMessage getMessage() diff --git a/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java b/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java index 42d9cccb4f..5fa3b6403d 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java @@ -139,7 +139,7 @@ public class TxAckTest extends TestCase }; TestMessage message = new TestMessage(deliveryTag, i, info, txnContext); - _map.add(deliveryTag, new UnacknowledgedMessage(new QueueEntry(null,message), null, deliveryTag)); + _map.add(deliveryTag, new UnacknowledgedMessage(new QueueEntry(null,message), null, deliveryTag, _map)); } _acked = acked; _unacked = unacked; -- cgit v1.2.1