summaryrefslogtreecommitdiff
path: root/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/AssignedSubscriptionMessageGroupManager.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/AssignedSubscriptionMessageGroupManager.java')
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/AssignedSubscriptionMessageGroupManager.java38
1 files changed, 29 insertions, 9 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/AssignedSubscriptionMessageGroupManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/AssignedSubscriptionMessageGroupManager.java
index ae7e11afa4..03d05e4b5d 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/AssignedSubscriptionMessageGroupManager.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/AssignedSubscriptionMessageGroupManager.java
@@ -20,10 +20,12 @@
*/
package org.apache.qpid.server.subscription;
+import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueEntryVisitor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import org.apache.qpid.server.queue.QueueEntry;
import java.util.Iterator;
@@ -36,12 +38,14 @@ public class AssignedSubscriptionMessageGroupManager implements MessageGroupMana
private final String _groupId;
+ private String _defaultGroup;
private final ConcurrentHashMap<Integer, Subscription> _groupMap = new ConcurrentHashMap<Integer, Subscription>();
private final int _groupMask;
- public AssignedSubscriptionMessageGroupManager(final String groupId, final int maxGroups)
+ public AssignedSubscriptionMessageGroupManager(final String groupId, String defaultGroup, final int maxGroups)
{
_groupId = groupId;
+ _defaultGroup = defaultGroup;
_groupMask = pow2(maxGroups)-1;
}
@@ -55,9 +59,21 @@ public class AssignedSubscriptionMessageGroupManager implements MessageGroupMana
return val;
}
+ private Object getKey(QueueEntry entry)
+ {
+ ServerMessage<?> message = entry.getMessage();
+ AMQMessageHeader messageHeader = message == null ? null : message.getMessageHeader();
+ Object groupVal = messageHeader == null ? _defaultGroup : messageHeader.getHeader(_groupId);
+ if(groupVal == null)
+ {
+ groupVal = _defaultGroup;
+ }
+ return groupVal;
+ }
+
public Subscription getAssignedSubscription(final QueueEntry entry)
{
- Object groupVal = entry.getMessage().getMessageHeader().getHeader(_groupId);
+ Object groupVal = getKey(entry);
return groupVal == null ? null : _groupMap.get(groupVal.hashCode() & _groupMask);
}
@@ -75,7 +91,7 @@ public class AssignedSubscriptionMessageGroupManager implements MessageGroupMana
private boolean assignMessage(Subscription sub, QueueEntry entry)
{
- Object groupVal = entry.getMessage().getMessageHeader().getHeader(_groupId);
+ Object groupVal = getKey(entry);
if(groupVal == null)
{
return true;
@@ -107,10 +123,10 @@ public class AssignedSubscriptionMessageGroupManager implements MessageGroupMana
}
}
- public QueueEntry findEarliestAssignedAvailableEntry(Subscription sub)
+ public QueueEntry findEarliestAssignedAvailableEntry(Subscription sub, AMQQueue queue)
{
EntryFinder visitor = new EntryFinder(sub);
- sub.getQueue().visit(visitor);
+ queue.visit(visitor);
return visitor.getEntry();
}
@@ -131,21 +147,25 @@ public class AssignedSubscriptionMessageGroupManager implements MessageGroupMana
return false;
}
- Object groupId = entry.getMessage().getMessageHeader().getHeader(_groupId);
+ Object groupId = getKey(entry);
if(groupId == null)
{
- return false;
+ //message is not part of a group, anyone who wants it can consume it
+ _entry = entry;
+ return true;
}
Integer group = groupId.hashCode() & _groupMask;
Subscription assignedSub = _groupMap.get(group);
- if(assignedSub == _sub)
+ if(assignedSub == _sub || assignedSub == null)
{
+ //group is either not assigned or is assigned to this subscription
_entry = entry;
return true;
}
else
{
+ //group is already assigned to another subscription
return false;
}
}