summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2007-05-17 12:12:34 +0000
committerMartin Ritchie <ritchiem@apache.org>2007-05-17 12:12:34 +0000
commitb83d2c7f5d2cb6ef51459574f7fa4fe4bd629563 (patch)
treeb7f2bdb78a041b977fd76a631d26d275f83f97f2
parent0843e8e4506a4b9e05c56822e50c7fd670ab531f (diff)
downloadqpid-python-b83d2c7f5d2cb6ef51459574f7fa4fe4bd629563.tar.gz
Fix for broken CSDM message purging routine that was causing python test_get to fail.
Replaced long while control with a method call that is easier to understand and has more comments. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2@538882 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java56
1 files changed, 49 insertions, 7 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
index bdc2189676..0fb5e6d88a 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
@@ -451,13 +451,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
AMQMessage message = messages.peek();
//while (we have a message) && ((The subscriber is not a browser or message is taken ) or we are clearing) && (Check message is taken.)
- while (message != null
- && (
- ((sub != null && !sub.isBrowser()) || message.isTaken(_queue))
- || sub == null)
- && (message.taken(_queue, sub) // Message not taken by another consumer ... unless it is expired
- || (sub == null || message.expired(sub.getChannel().getStoreContext(), _queue))) // Message not expired
- )
+ while (purgeMessage(message, sub))
{
//remove the already taken message or expired
AMQMessage removed = messages.poll();
@@ -478,6 +472,54 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
return message;
}
+ /**
+ *
+ * @param message
+ * @param sub
+ * @return
+ * @throws AMQException
+ */
+ private boolean purgeMessage(AMQMessage message, Subscription sub) throws AMQException
+ {
+ //Original.. complicated while loop control
+// (message != null
+// && (
+// ((sub != null && !sub.isBrowser()) || message.isTaken(_queue))
+// || sub == null)
+// && message.taken(_queue, sub));
+
+ boolean purge = false;
+
+ // if the message is null then don't purge as we have no messagse.
+ if (message != null)
+ {
+ // if we have a subscriber perform message checks
+ if (sub != null)
+ {
+ // Check that the message hasn't expired.
+ if (message.expired(sub.getChannel().getStoreContext(), _queue))
+ {
+ return true;
+ }
+
+ // if we have a queue browser(we don't purge) so check mark the message as taken
+ purge = ((!sub.isBrowser() || message.isTaken(_queue)));
+ }
+ else
+ {
+ // if there is no subscription we are doing
+ // a get or purging so mark message as taken.
+ message.isTaken(_queue);
+ // and then ensure that it gets purged
+ purge = true;
+ }
+ }
+
+ // if we are purging then ensure we mark this message taken for the current subscriber
+ // the current subscriber may be null in the case of a get or a purge but this is ok.
+ return purge && message.taken(_queue, sub);
+ }
+
public void sendNextMessage(Subscription sub, AMQQueue queue)//Queue<AMQMessage> messageQueue)
{