diff options
Diffstat (limited to 'qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java')
-rw-r--r-- | qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java | 73 |
1 files changed, 54 insertions, 19 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java index 55110c46de..b33705e50c 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java @@ -20,10 +20,10 @@ */ package org.apache.qpid.server.subscription; +import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueEntryVisitor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.queue.QueueEntry; @@ -126,29 +126,54 @@ public class DefinedGroupMessageGroupManager implements MessageGroupManager _resetHelper = resetHelper; } - public synchronized Subscription getAssignedSubscription(final QueueEntry entry) + public Subscription getAssignedSubscription(final QueueEntry entry) { Object groupId = getKey(entry); - Group group = _groupMap.get(groupId); - return group == null || !group.isValid() ? null : group.getSubscription(); + if(groupId == null) + { + return null; + } + else + { + synchronized (this) + { + Group group = _groupMap.get(groupId); + return group == null || !group.isValid() ? null : group.getSubscription(); + } + } } - public synchronized boolean acceptMessage(final Subscription sub, final QueueEntry entry) + public boolean acceptMessage(final Subscription sub, final QueueEntry entry) { - if(assignMessage(sub, entry)) + Object groupId = getKey(entry); + if(groupId == null) { return entry.acquire(sub); } else { - return false; + synchronized (this) + { + if(assignMessage(sub, entry, groupId)) + { + return entry.acquire(sub); + } + else + { + return false; + } + } } } - private boolean assignMessage(final Subscription sub, final QueueEntry entry) + private boolean assignMessage(final Subscription sub, final QueueEntry entry, Object groupId) { - Object groupId = getKey(entry); + if(groupId == null) + { + return true; + } + Group group = _groupMap.get(groupId); if(group == null || !group.isValid()) @@ -157,7 +182,7 @@ public class DefinedGroupMessageGroupManager implements MessageGroupManager _groupMap.put(groupId, group); - // there's a small change that the group became empty between the point at which getNextAvailable() was + // there's a small chance that the group became empty between the point at which SAMQQ#getNextAvailableEntry() was // called on the subscription, and when accept message is called... in that case we want to avoid delivering // out of order if(_resetHelper.isEntryAheadOfSubscription(entry, sub)) @@ -179,10 +204,10 @@ public class DefinedGroupMessageGroupManager implements MessageGroupManager } } - public synchronized QueueEntry findEarliestAssignedAvailableEntry(final Subscription sub) + public synchronized QueueEntry findEarliestAssignedAvailableEntry(final Subscription sub, AMQQueue queue) { EntryFinder visitor = new EntryFinder(sub); - sub.getQueue().visit(visitor); + queue.visit(visitor); return visitor.getEntry(); } @@ -204,16 +229,26 @@ public class DefinedGroupMessageGroupManager implements MessageGroupManager } Object groupId = getKey(entry); - - Group group = _groupMap.get(groupId); - if(group != null && group.getSubscription() == _sub) + if(groupId != null) { - _entry = entry; - return true; + Group group = _groupMap.get(groupId); + if(group == null || group.getSubscription() == null || group.getSubscription() == _sub) + { + //group is either not assigned or is assigned to this subscription + _entry = entry; + return true; + } + else + { + //group is already assigned to another subscription + return false; + } } else { - return false; + //message is not part of a group, anyone who wants it can consume it + _entry = entry; + return true; } } @@ -230,7 +265,7 @@ public class DefinedGroupMessageGroupManager implements MessageGroupManager private Object getKey(QueueEntry entry) { - ServerMessage message = entry.getMessage(); + ServerMessage<?> message = entry.getMessage(); AMQMessageHeader messageHeader = message == null ? null : message.getMessageHeader(); Object groupVal = messageHeader == null ? _defaultGroup : messageHeader.getHeader(_groupId); if(groupVal == null) |