summaryrefslogtreecommitdiff
path: root/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java')
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java73
1 files changed, 54 insertions, 19 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java
index 55110c46de..b33705e50c 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java
@@ -20,10 +20,10 @@
*/
package org.apache.qpid.server.subscription;
+import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueEntryVisitor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.queue.QueueEntry;
@@ -126,29 +126,54 @@ public class DefinedGroupMessageGroupManager implements MessageGroupManager
_resetHelper = resetHelper;
}
- public synchronized Subscription getAssignedSubscription(final QueueEntry entry)
+ public Subscription getAssignedSubscription(final QueueEntry entry)
{
Object groupId = getKey(entry);
- Group group = _groupMap.get(groupId);
- return group == null || !group.isValid() ? null : group.getSubscription();
+ if(groupId == null)
+ {
+ return null;
+ }
+ else
+ {
+ synchronized (this)
+ {
+ Group group = _groupMap.get(groupId);
+ return group == null || !group.isValid() ? null : group.getSubscription();
+ }
+ }
}
- public synchronized boolean acceptMessage(final Subscription sub, final QueueEntry entry)
+ public boolean acceptMessage(final Subscription sub, final QueueEntry entry)
{
- if(assignMessage(sub, entry))
+ Object groupId = getKey(entry);
+ if(groupId == null)
{
return entry.acquire(sub);
}
else
{
- return false;
+ synchronized (this)
+ {
+ if(assignMessage(sub, entry, groupId))
+ {
+ return entry.acquire(sub);
+ }
+ else
+ {
+ return false;
+ }
+ }
}
}
- private boolean assignMessage(final Subscription sub, final QueueEntry entry)
+ private boolean assignMessage(final Subscription sub, final QueueEntry entry, Object groupId)
{
- Object groupId = getKey(entry);
+ if(groupId == null)
+ {
+ return true;
+ }
+
Group group = _groupMap.get(groupId);
if(group == null || !group.isValid())
@@ -157,7 +182,7 @@ public class DefinedGroupMessageGroupManager implements MessageGroupManager
_groupMap.put(groupId, group);
- // there's a small change that the group became empty between the point at which getNextAvailable() was
+ // there's a small chance that the group became empty between the point at which SAMQQ#getNextAvailableEntry() was
// called on the subscription, and when accept message is called... in that case we want to avoid delivering
// out of order
if(_resetHelper.isEntryAheadOfSubscription(entry, sub))
@@ -179,10 +204,10 @@ public class DefinedGroupMessageGroupManager implements MessageGroupManager
}
}
- public synchronized QueueEntry findEarliestAssignedAvailableEntry(final Subscription sub)
+ public synchronized QueueEntry findEarliestAssignedAvailableEntry(final Subscription sub, AMQQueue queue)
{
EntryFinder visitor = new EntryFinder(sub);
- sub.getQueue().visit(visitor);
+ queue.visit(visitor);
return visitor.getEntry();
}
@@ -204,16 +229,26 @@ public class DefinedGroupMessageGroupManager implements MessageGroupManager
}
Object groupId = getKey(entry);
-
- Group group = _groupMap.get(groupId);
- if(group != null && group.getSubscription() == _sub)
+ if(groupId != null)
{
- _entry = entry;
- return true;
+ Group group = _groupMap.get(groupId);
+ if(group == null || group.getSubscription() == null || group.getSubscription() == _sub)
+ {
+ //group is either not assigned or is assigned to this subscription
+ _entry = entry;
+ return true;
+ }
+ else
+ {
+ //group is already assigned to another subscription
+ return false;
+ }
}
else
{
- return false;
+ //message is not part of a group, anyone who wants it can consume it
+ _entry = entry;
+ return true;
}
}
@@ -230,7 +265,7 @@ public class DefinedGroupMessageGroupManager implements MessageGroupManager
private Object getKey(QueueEntry entry)
{
- ServerMessage message = entry.getMessage();
+ ServerMessage<?> message = entry.getMessage();
AMQMessageHeader messageHeader = message == null ? null : message.getMessageHeader();
Object groupVal = messageHeader == null ? _defaultGroup : messageHeader.getHeader(_groupId);
if(groupVal == null)