diff options
author | Martin Ritchie <ritchiem@apache.org> | 2007-05-17 12:12:34 +0000 |
---|---|---|
committer | Martin Ritchie <ritchiem@apache.org> | 2007-05-17 12:12:34 +0000 |
commit | b83d2c7f5d2cb6ef51459574f7fa4fe4bd629563 (patch) | |
tree | b7f2bdb78a041b977fd76a631d26d275f83f97f2 | |
parent | 0843e8e4506a4b9e05c56822e50c7fd670ab531f (diff) | |
download | qpid-python-b83d2c7f5d2cb6ef51459574f7fa4fe4bd629563.tar.gz |
Fix for broken CSDM message purging routine that was causing python test_get to fail.
Replaced long while control with a method call that is easier to understand and has more comments.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2@538882 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java | 56 |
1 files changed, 49 insertions, 7 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java index bdc2189676..0fb5e6d88a 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java @@ -451,13 +451,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager AMQMessage message = messages.peek(); //while (we have a message) && ((The subscriber is not a browser or message is taken ) or we are clearing) && (Check message is taken.) - while (message != null - && ( - ((sub != null && !sub.isBrowser()) || message.isTaken(_queue)) - || sub == null) - && (message.taken(_queue, sub) // Message not taken by another consumer ... unless it is expired - || (sub == null || message.expired(sub.getChannel().getStoreContext(), _queue))) // Message not expired - ) + while (purgeMessage(message, sub)) { //remove the already taken message or expired AMQMessage removed = messages.poll(); @@ -478,6 +472,54 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager return message; } + /** + * + * @param message + * @param sub + * @return + * @throws AMQException + */ + private boolean purgeMessage(AMQMessage message, Subscription sub) throws AMQException + { + //Original.. complicated while loop control +// (message != null +// && ( +// ((sub != null && !sub.isBrowser()) || message.isTaken(_queue)) +// || sub == null) +// && message.taken(_queue, sub)); + + boolean purge = false; + + // if the message is null then don't purge as we have no messagse. + if (message != null) + { + // if we have a subscriber perform message checks + if (sub != null) + { + // Check that the message hasn't expired. + if (message.expired(sub.getChannel().getStoreContext(), _queue)) + { + return true; + } + + // if we have a queue browser(we don't purge) so check mark the message as taken + purge = ((!sub.isBrowser() || message.isTaken(_queue))); + } + else + { + // if there is no subscription we are doing + // a get or purging so mark message as taken. + message.isTaken(_queue); + // and then ensure that it gets purged + purge = true; + } + } + + // if we are purging then ensure we mark this message taken for the current subscriber + // the current subscriber may be null in the case of a get or a purge but this is ok. + return purge && message.taken(_queue, sub); + } + public void sendNextMessage(Subscription sub, AMQQueue queue)//Queue<AMQMessage> messageQueue) { |