summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAidan Skinner <aidan@apache.org>2008-05-16 15:20:52 +0000
committerAidan Skinner <aidan@apache.org>2008-05-16 15:20:52 +0000
commit2360f1b87fbe721f71e6a63065c489ca552c9b0c (patch)
tree91567ae0178870d59293380c79ba029569c8ac91
parent716bf9a824491e93f39bc79ccd0caf9fbb36f2ee (diff)
downloadqpid-python-2360f1b87fbe721f71e6a63065c489ca552c9b0c.tar.gz
Merged revisions 653416 via svnmerge from
https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.x ........ r653416 | aidan | 2008-05-05 11:24:50 +0100 (Mon, 05 May 2008) | 1 line QPID-1019 prevent messages being dequeued unecessarily, from rgodfrey ........ git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1.x@657101 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java23
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java2
3 files changed, 20 insertions, 7 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 17b4fa5d65..1df06a4911 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -469,7 +469,7 @@ public class AMQChannel
synchronized (_unacknowledgedMessageMap.getLock())
{
- _unacknowledgedMessageMap.add(deliveryTag, new UnacknowledgedMessage(entry, consumerTag, deliveryTag));
+ _unacknowledgedMessageMap.add(deliveryTag, new UnacknowledgedMessage(entry, consumerTag, deliveryTag,_unacknowledgedMessageMap));
checkSuspension();
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java
index df7cecc940..0112d3b388 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java
@@ -34,13 +34,18 @@ public class UnacknowledgedMessage
public final long deliveryTag;
private boolean _queueDeleted;
+ private final UnacknowledgedMessageMap _unacknowledgeMessageMap;
- public UnacknowledgedMessage(QueueEntry entry, AMQShortString consumerTag, long deliveryTag)
+ public UnacknowledgedMessage(QueueEntry entry,
+ AMQShortString consumerTag,
+ long deliveryTag,
+ final UnacknowledgedMessageMap unacknowledgedMessageMap)
{
this.entry = entry;
this.consumerTag = consumerTag;
this.deliveryTag = deliveryTag;
+ _unacknowledgeMessageMap = unacknowledgedMessageMap;
}
public String toString()
@@ -60,12 +65,20 @@ public class UnacknowledgedMessage
public void discard(StoreContext storeContext) throws AMQException
{
- if (entry.getQueue() != null)
+ synchronized(_unacknowledgeMessageMap)
{
- entry.getQueue().dequeue(storeContext, entry);
+ if(_unacknowledgeMessageMap.contains(deliveryTag))
+ {
+
+ if (entry.getQueue() != null)
+ {
+ entry.getQueue().dequeue(storeContext, entry);
+ }
+ //if the queue is null then the message is waiting to be acked, but has been removed.
+ entry.getMessage().decrementReference(storeContext);
+ }
}
- //if the queue is null then the message is waiting to be acked, but has been removed.
- entry.getMessage().decrementReference(storeContext);
+
}
public AMQMessage getMessage()
diff --git a/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java b/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java
index 42d9cccb4f..5fa3b6403d 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java
@@ -139,7 +139,7 @@ public class TxAckTest extends TestCase
};
TestMessage message = new TestMessage(deliveryTag, i, info, txnContext);
- _map.add(deliveryTag, new UnacknowledgedMessage(new QueueEntry(null,message), null, deliveryTag));
+ _map.add(deliveryTag, new UnacknowledgedMessage(new QueueEntry(null,message), null, deliveryTag, _map));
}
_acked = acked;
_unacked = unacked;