summaryrefslogtreecommitdiff
path: root/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription')
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/AssignedSubscriptionMessageGroupManager.java38
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java73
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/MessageGroupManager.java3
3 files changed, 85 insertions, 29 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/AssignedSubscriptionMessageGroupManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/AssignedSubscriptionMessageGroupManager.java
index ae7e11afa4..03d05e4b5d 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/AssignedSubscriptionMessageGroupManager.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/AssignedSubscriptionMessageGroupManager.java
@@ -20,10 +20,12 @@
*/
package org.apache.qpid.server.subscription;
+import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueEntryVisitor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import org.apache.qpid.server.queue.QueueEntry;
import java.util.Iterator;
@@ -36,12 +38,14 @@ public class AssignedSubscriptionMessageGroupManager implements MessageGroupMana
private final String _groupId;
+ private String _defaultGroup;
private final ConcurrentHashMap<Integer, Subscription> _groupMap = new ConcurrentHashMap<Integer, Subscription>();
private final int _groupMask;
- public AssignedSubscriptionMessageGroupManager(final String groupId, final int maxGroups)
+ public AssignedSubscriptionMessageGroupManager(final String groupId, String defaultGroup, final int maxGroups)
{
_groupId = groupId;
+ _defaultGroup = defaultGroup;
_groupMask = pow2(maxGroups)-1;
}
@@ -55,9 +59,21 @@ public class AssignedSubscriptionMessageGroupManager implements MessageGroupMana
return val;
}
+ private Object getKey(QueueEntry entry)
+ {
+ ServerMessage<?> message = entry.getMessage();
+ AMQMessageHeader messageHeader = message == null ? null : message.getMessageHeader();
+ Object groupVal = messageHeader == null ? _defaultGroup : messageHeader.getHeader(_groupId);
+ if(groupVal == null)
+ {
+ groupVal = _defaultGroup;
+ }
+ return groupVal;
+ }
+
public Subscription getAssignedSubscription(final QueueEntry entry)
{
- Object groupVal = entry.getMessage().getMessageHeader().getHeader(_groupId);
+ Object groupVal = getKey(entry);
return groupVal == null ? null : _groupMap.get(groupVal.hashCode() & _groupMask);
}
@@ -75,7 +91,7 @@ public class AssignedSubscriptionMessageGroupManager implements MessageGroupMana
private boolean assignMessage(Subscription sub, QueueEntry entry)
{
- Object groupVal = entry.getMessage().getMessageHeader().getHeader(_groupId);
+ Object groupVal = getKey(entry);
if(groupVal == null)
{
return true;
@@ -107,10 +123,10 @@ public class AssignedSubscriptionMessageGroupManager implements MessageGroupMana
}
}
- public QueueEntry findEarliestAssignedAvailableEntry(Subscription sub)
+ public QueueEntry findEarliestAssignedAvailableEntry(Subscription sub, AMQQueue queue)
{
EntryFinder visitor = new EntryFinder(sub);
- sub.getQueue().visit(visitor);
+ queue.visit(visitor);
return visitor.getEntry();
}
@@ -131,21 +147,25 @@ public class AssignedSubscriptionMessageGroupManager implements MessageGroupMana
return false;
}
- Object groupId = entry.getMessage().getMessageHeader().getHeader(_groupId);
+ Object groupId = getKey(entry);
if(groupId == null)
{
- return false;
+ //message is not part of a group, anyone who wants it can consume it
+ _entry = entry;
+ return true;
}
Integer group = groupId.hashCode() & _groupMask;
Subscription assignedSub = _groupMap.get(group);
- if(assignedSub == _sub)
+ if(assignedSub == _sub || assignedSub == null)
{
+ //group is either not assigned or is assigned to this subscription
_entry = entry;
return true;
}
else
{
+ //group is already assigned to another subscription
return false;
}
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java
index 55110c46de..b33705e50c 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java
@@ -20,10 +20,10 @@
*/
package org.apache.qpid.server.subscription;
+import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueEntryVisitor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.queue.QueueEntry;
@@ -126,29 +126,54 @@ public class DefinedGroupMessageGroupManager implements MessageGroupManager
_resetHelper = resetHelper;
}
- public synchronized Subscription getAssignedSubscription(final QueueEntry entry)
+ public Subscription getAssignedSubscription(final QueueEntry entry)
{
Object groupId = getKey(entry);
- Group group = _groupMap.get(groupId);
- return group == null || !group.isValid() ? null : group.getSubscription();
+ if(groupId == null)
+ {
+ return null;
+ }
+ else
+ {
+ synchronized (this)
+ {
+ Group group = _groupMap.get(groupId);
+ return group == null || !group.isValid() ? null : group.getSubscription();
+ }
+ }
}
- public synchronized boolean acceptMessage(final Subscription sub, final QueueEntry entry)
+ public boolean acceptMessage(final Subscription sub, final QueueEntry entry)
{
- if(assignMessage(sub, entry))
+ Object groupId = getKey(entry);
+ if(groupId == null)
{
return entry.acquire(sub);
}
else
{
- return false;
+ synchronized (this)
+ {
+ if(assignMessage(sub, entry, groupId))
+ {
+ return entry.acquire(sub);
+ }
+ else
+ {
+ return false;
+ }
+ }
}
}
- private boolean assignMessage(final Subscription sub, final QueueEntry entry)
+ private boolean assignMessage(final Subscription sub, final QueueEntry entry, Object groupId)
{
- Object groupId = getKey(entry);
+ if(groupId == null)
+ {
+ return true;
+ }
+
Group group = _groupMap.get(groupId);
if(group == null || !group.isValid())
@@ -157,7 +182,7 @@ public class DefinedGroupMessageGroupManager implements MessageGroupManager
_groupMap.put(groupId, group);
- // there's a small change that the group became empty between the point at which getNextAvailable() was
+ // there's a small chance that the group became empty between the point at which SAMQQ#getNextAvailableEntry() was
// called on the subscription, and when accept message is called... in that case we want to avoid delivering
// out of order
if(_resetHelper.isEntryAheadOfSubscription(entry, sub))
@@ -179,10 +204,10 @@ public class DefinedGroupMessageGroupManager implements MessageGroupManager
}
}
- public synchronized QueueEntry findEarliestAssignedAvailableEntry(final Subscription sub)
+ public synchronized QueueEntry findEarliestAssignedAvailableEntry(final Subscription sub, AMQQueue queue)
{
EntryFinder visitor = new EntryFinder(sub);
- sub.getQueue().visit(visitor);
+ queue.visit(visitor);
return visitor.getEntry();
}
@@ -204,16 +229,26 @@ public class DefinedGroupMessageGroupManager implements MessageGroupManager
}
Object groupId = getKey(entry);
-
- Group group = _groupMap.get(groupId);
- if(group != null && group.getSubscription() == _sub)
+ if(groupId != null)
{
- _entry = entry;
- return true;
+ Group group = _groupMap.get(groupId);
+ if(group == null || group.getSubscription() == null || group.getSubscription() == _sub)
+ {
+ //group is either not assigned or is assigned to this subscription
+ _entry = entry;
+ return true;
+ }
+ else
+ {
+ //group is already assigned to another subscription
+ return false;
+ }
}
else
{
- return false;
+ //message is not part of a group, anyone who wants it can consume it
+ _entry = entry;
+ return true;
}
}
@@ -230,7 +265,7 @@ public class DefinedGroupMessageGroupManager implements MessageGroupManager
private Object getKey(QueueEntry entry)
{
- ServerMessage message = entry.getMessage();
+ ServerMessage<?> message = entry.getMessage();
AMQMessageHeader messageHeader = message == null ? null : message.getMessageHeader();
Object groupVal = messageHeader == null ? _defaultGroup : messageHeader.getHeader(_groupId);
if(groupVal == null)
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/MessageGroupManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/MessageGroupManager.java
index 8ce4ce3344..4169c704f5 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/MessageGroupManager.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/MessageGroupManager.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.subscription;
+import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueEntry;
public interface MessageGroupManager
@@ -35,7 +36,7 @@ public interface MessageGroupManager
boolean acceptMessage(Subscription sub, QueueEntry entry);
- QueueEntry findEarliestAssignedAvailableEntry(Subscription sub);
+ QueueEntry findEarliestAssignedAvailableEntry(Subscription sub, AMQQueue queue);
void clearAssignments(Subscription sub);
}