diff options
Diffstat (limited to 'qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/AssignedSubscriptionMessageGroupManager.java')
-rw-r--r-- | qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/AssignedSubscriptionMessageGroupManager.java | 38 |
1 files changed, 29 insertions, 9 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/AssignedSubscriptionMessageGroupManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/AssignedSubscriptionMessageGroupManager.java index ae7e11afa4..03d05e4b5d 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/AssignedSubscriptionMessageGroupManager.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/AssignedSubscriptionMessageGroupManager.java @@ -20,10 +20,12 @@ */ package org.apache.qpid.server.subscription; +import org.apache.qpid.server.message.AMQMessageHeader; +import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueEntryVisitor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.apache.qpid.server.queue.QueueEntry; import java.util.Iterator; @@ -36,12 +38,14 @@ public class AssignedSubscriptionMessageGroupManager implements MessageGroupMana private final String _groupId; + private String _defaultGroup; private final ConcurrentHashMap<Integer, Subscription> _groupMap = new ConcurrentHashMap<Integer, Subscription>(); private final int _groupMask; - public AssignedSubscriptionMessageGroupManager(final String groupId, final int maxGroups) + public AssignedSubscriptionMessageGroupManager(final String groupId, String defaultGroup, final int maxGroups) { _groupId = groupId; + _defaultGroup = defaultGroup; _groupMask = pow2(maxGroups)-1; } @@ -55,9 +59,21 @@ public class AssignedSubscriptionMessageGroupManager implements MessageGroupMana return val; } + private Object getKey(QueueEntry entry) + { + ServerMessage<?> message = entry.getMessage(); + AMQMessageHeader messageHeader = message == null ? null : message.getMessageHeader(); + Object groupVal = messageHeader == null ? _defaultGroup : messageHeader.getHeader(_groupId); + if(groupVal == null) + { + groupVal = _defaultGroup; + } + return groupVal; + } + public Subscription getAssignedSubscription(final QueueEntry entry) { - Object groupVal = entry.getMessage().getMessageHeader().getHeader(_groupId); + Object groupVal = getKey(entry); return groupVal == null ? null : _groupMap.get(groupVal.hashCode() & _groupMask); } @@ -75,7 +91,7 @@ public class AssignedSubscriptionMessageGroupManager implements MessageGroupMana private boolean assignMessage(Subscription sub, QueueEntry entry) { - Object groupVal = entry.getMessage().getMessageHeader().getHeader(_groupId); + Object groupVal = getKey(entry); if(groupVal == null) { return true; @@ -107,10 +123,10 @@ public class AssignedSubscriptionMessageGroupManager implements MessageGroupMana } } - public QueueEntry findEarliestAssignedAvailableEntry(Subscription sub) + public QueueEntry findEarliestAssignedAvailableEntry(Subscription sub, AMQQueue queue) { EntryFinder visitor = new EntryFinder(sub); - sub.getQueue().visit(visitor); + queue.visit(visitor); return visitor.getEntry(); } @@ -131,21 +147,25 @@ public class AssignedSubscriptionMessageGroupManager implements MessageGroupMana return false; } - Object groupId = entry.getMessage().getMessageHeader().getHeader(_groupId); + Object groupId = getKey(entry); if(groupId == null) { - return false; + //message is not part of a group, anyone who wants it can consume it + _entry = entry; + return true; } Integer group = groupId.hashCode() & _groupMask; Subscription assignedSub = _groupMap.get(group); - if(assignedSub == _sub) + if(assignedSub == _sub || assignedSub == null) { + //group is either not assigned or is assigned to this subscription _entry = entry; return true; } else { + //group is already assigned to another subscription return false; } } |