summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Gemmell <robbie@apache.org>2014-01-20 21:33:58 +0000
committerRobert Gemmell <robbie@apache.org>2014-01-20 21:33:58 +0000
commit35d78414778d9856d0e441416cb2f7e737ab7627 (patch)
treee51ddc9d24880beefe2b7fe40933a41d7d0dbdcb
parentdefa7dc1fa2e5771ee1df636c9421ed66fb67763 (diff)
downloadqpid-python-QPID-5496_default_groups.tar.gz
QPID-5496: WIP on a common way of configuring the default-group for a queue, and allowing queues with 'shared groups' to support non-grouped messagesQPID-5496_default_groups
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-5496_default_groups@1559833 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java33
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/AssignedSubscriptionMessageGroupManager.java38
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java73
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/MessageGroupManager.java3
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MessageGroupQueueTest.java1
5 files changed, 109 insertions, 39 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index b002419064..567fc02675 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -71,6 +71,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
public static final String SHARED_MSG_GROUP_ARG_VALUE = "1";
private static final String QPID_NO_GROUP = "qpid.no-group";
private static final String DEFAULT_SHARED_MESSAGE_GROUP = System.getProperty(BrokerProperties.PROPERTY_DEFAULT_SHARED_MESSAGE_GROUP, QPID_NO_GROUP);
+ private static final String QPID_NO_DEFAULT_GROUP = "qpid.no-default-group";
// TODO - should make this configurable at the vhost / broker level
private static final int DEFAULT_MAX_GROUPS = 255;
@@ -246,19 +247,31 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
if(arguments != null && arguments.containsKey(Queue.MESSAGE_GROUP_KEY))
{
- if(arguments.get(Queue.MESSAGE_GROUP_SHARED_GROUPS) != null
- && (Boolean)(arguments.get(Queue.MESSAGE_GROUP_SHARED_GROUPS)))
+ String messageGroupKey = String.valueOf(arguments.get(Queue.MESSAGE_GROUP_KEY));
+ boolean requestedSharedGroups = arguments.get(Queue.MESSAGE_GROUP_SHARED_GROUPS) != null && (Boolean)(arguments.get(Queue.MESSAGE_GROUP_SHARED_GROUPS));
+
+ //Determine the default group value
+ String defaultGroup = requestedSharedGroups ? DEFAULT_SHARED_MESSAGE_GROUP : null;
+ if(arguments.containsKey(Queue.MESSAGE_GROUP_DEFAULT_GROUP))
+ {
+ Object defaultGroupArg = arguments.get(Queue.MESSAGE_GROUP_DEFAULT_GROUP);
+
+ defaultGroup = defaultGroupArg == null ? null : defaultGroupArg.toString();
+ }
+
+ //Remove the default group if requested by the configured value
+ if(QPID_NO_DEFAULT_GROUP.equals(defaultGroup))
+ {
+ defaultGroup = null;
+ }
+
+ if(requestedSharedGroups)
{
- Object defaultGroup = arguments.get(Queue.MESSAGE_GROUP_DEFAULT_GROUP);
- _messageGroupManager =
- new DefinedGroupMessageGroupManager(String.valueOf(arguments.get(Queue.MESSAGE_GROUP_KEY)),
- defaultGroup == null ? DEFAULT_SHARED_MESSAGE_GROUP : defaultGroup.toString(),
- this);
+ _messageGroupManager = new DefinedGroupMessageGroupManager(messageGroupKey, defaultGroup, this);
}
else
{
- _messageGroupManager = new AssignedSubscriptionMessageGroupManager(String.valueOf(arguments.get(
- Queue.MESSAGE_GROUP_KEY)), DEFAULT_MAX_GROUPS);
+ _messageGroupManager = new AssignedSubscriptionMessageGroupManager(messageGroupKey, defaultGroup, DEFAULT_MAX_GROUPS);
}
}
else
@@ -541,7 +554,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
public void resetSubPointersForGroups(Subscription subscription, boolean clearAssignments)
{
- QueueEntry entry = _messageGroupManager.findEarliestAssignedAvailableEntry(subscription);
+ QueueEntry entry = _messageGroupManager.findEarliestAssignedAvailableEntry(subscription, this);
if(clearAssignments)
{
_messageGroupManager.clearAssignments(subscription);
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/AssignedSubscriptionMessageGroupManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/AssignedSubscriptionMessageGroupManager.java
index ae7e11afa4..03d05e4b5d 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/AssignedSubscriptionMessageGroupManager.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/AssignedSubscriptionMessageGroupManager.java
@@ -20,10 +20,12 @@
*/
package org.apache.qpid.server.subscription;
+import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueEntryVisitor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import org.apache.qpid.server.queue.QueueEntry;
import java.util.Iterator;
@@ -36,12 +38,14 @@ public class AssignedSubscriptionMessageGroupManager implements MessageGroupMana
private final String _groupId;
+ private String _defaultGroup;
private final ConcurrentHashMap<Integer, Subscription> _groupMap = new ConcurrentHashMap<Integer, Subscription>();
private final int _groupMask;
- public AssignedSubscriptionMessageGroupManager(final String groupId, final int maxGroups)
+ public AssignedSubscriptionMessageGroupManager(final String groupId, String defaultGroup, final int maxGroups)
{
_groupId = groupId;
+ _defaultGroup = defaultGroup;
_groupMask = pow2(maxGroups)-1;
}
@@ -55,9 +59,21 @@ public class AssignedSubscriptionMessageGroupManager implements MessageGroupMana
return val;
}
+ private Object getKey(QueueEntry entry)
+ {
+ ServerMessage<?> message = entry.getMessage();
+ AMQMessageHeader messageHeader = message == null ? null : message.getMessageHeader();
+ Object groupVal = messageHeader == null ? _defaultGroup : messageHeader.getHeader(_groupId);
+ if(groupVal == null)
+ {
+ groupVal = _defaultGroup;
+ }
+ return groupVal;
+ }
+
public Subscription getAssignedSubscription(final QueueEntry entry)
{
- Object groupVal = entry.getMessage().getMessageHeader().getHeader(_groupId);
+ Object groupVal = getKey(entry);
return groupVal == null ? null : _groupMap.get(groupVal.hashCode() & _groupMask);
}
@@ -75,7 +91,7 @@ public class AssignedSubscriptionMessageGroupManager implements MessageGroupMana
private boolean assignMessage(Subscription sub, QueueEntry entry)
{
- Object groupVal = entry.getMessage().getMessageHeader().getHeader(_groupId);
+ Object groupVal = getKey(entry);
if(groupVal == null)
{
return true;
@@ -107,10 +123,10 @@ public class AssignedSubscriptionMessageGroupManager implements MessageGroupMana
}
}
- public QueueEntry findEarliestAssignedAvailableEntry(Subscription sub)
+ public QueueEntry findEarliestAssignedAvailableEntry(Subscription sub, AMQQueue queue)
{
EntryFinder visitor = new EntryFinder(sub);
- sub.getQueue().visit(visitor);
+ queue.visit(visitor);
return visitor.getEntry();
}
@@ -131,21 +147,25 @@ public class AssignedSubscriptionMessageGroupManager implements MessageGroupMana
return false;
}
- Object groupId = entry.getMessage().getMessageHeader().getHeader(_groupId);
+ Object groupId = getKey(entry);
if(groupId == null)
{
- return false;
+ //message is not part of a group, anyone who wants it can consume it
+ _entry = entry;
+ return true;
}
Integer group = groupId.hashCode() & _groupMask;
Subscription assignedSub = _groupMap.get(group);
- if(assignedSub == _sub)
+ if(assignedSub == _sub || assignedSub == null)
{
+ //group is either not assigned or is assigned to this subscription
_entry = entry;
return true;
}
else
{
+ //group is already assigned to another subscription
return false;
}
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java
index 55110c46de..b33705e50c 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java
@@ -20,10 +20,10 @@
*/
package org.apache.qpid.server.subscription;
+import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueEntryVisitor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.queue.QueueEntry;
@@ -126,29 +126,54 @@ public class DefinedGroupMessageGroupManager implements MessageGroupManager
_resetHelper = resetHelper;
}
- public synchronized Subscription getAssignedSubscription(final QueueEntry entry)
+ public Subscription getAssignedSubscription(final QueueEntry entry)
{
Object groupId = getKey(entry);
- Group group = _groupMap.get(groupId);
- return group == null || !group.isValid() ? null : group.getSubscription();
+ if(groupId == null)
+ {
+ return null;
+ }
+ else
+ {
+ synchronized (this)
+ {
+ Group group = _groupMap.get(groupId);
+ return group == null || !group.isValid() ? null : group.getSubscription();
+ }
+ }
}
- public synchronized boolean acceptMessage(final Subscription sub, final QueueEntry entry)
+ public boolean acceptMessage(final Subscription sub, final QueueEntry entry)
{
- if(assignMessage(sub, entry))
+ Object groupId = getKey(entry);
+ if(groupId == null)
{
return entry.acquire(sub);
}
else
{
- return false;
+ synchronized (this)
+ {
+ if(assignMessage(sub, entry, groupId))
+ {
+ return entry.acquire(sub);
+ }
+ else
+ {
+ return false;
+ }
+ }
}
}
- private boolean assignMessage(final Subscription sub, final QueueEntry entry)
+ private boolean assignMessage(final Subscription sub, final QueueEntry entry, Object groupId)
{
- Object groupId = getKey(entry);
+ if(groupId == null)
+ {
+ return true;
+ }
+
Group group = _groupMap.get(groupId);
if(group == null || !group.isValid())
@@ -157,7 +182,7 @@ public class DefinedGroupMessageGroupManager implements MessageGroupManager
_groupMap.put(groupId, group);
- // there's a small change that the group became empty between the point at which getNextAvailable() was
+ // there's a small chance that the group became empty between the point at which SAMQQ#getNextAvailableEntry() was
// called on the subscription, and when accept message is called... in that case we want to avoid delivering
// out of order
if(_resetHelper.isEntryAheadOfSubscription(entry, sub))
@@ -179,10 +204,10 @@ public class DefinedGroupMessageGroupManager implements MessageGroupManager
}
}
- public synchronized QueueEntry findEarliestAssignedAvailableEntry(final Subscription sub)
+ public synchronized QueueEntry findEarliestAssignedAvailableEntry(final Subscription sub, AMQQueue queue)
{
EntryFinder visitor = new EntryFinder(sub);
- sub.getQueue().visit(visitor);
+ queue.visit(visitor);
return visitor.getEntry();
}
@@ -204,16 +229,26 @@ public class DefinedGroupMessageGroupManager implements MessageGroupManager
}
Object groupId = getKey(entry);
-
- Group group = _groupMap.get(groupId);
- if(group != null && group.getSubscription() == _sub)
+ if(groupId != null)
{
- _entry = entry;
- return true;
+ Group group = _groupMap.get(groupId);
+ if(group == null || group.getSubscription() == null || group.getSubscription() == _sub)
+ {
+ //group is either not assigned or is assigned to this subscription
+ _entry = entry;
+ return true;
+ }
+ else
+ {
+ //group is already assigned to another subscription
+ return false;
+ }
}
else
{
- return false;
+ //message is not part of a group, anyone who wants it can consume it
+ _entry = entry;
+ return true;
}
}
@@ -230,7 +265,7 @@ public class DefinedGroupMessageGroupManager implements MessageGroupManager
private Object getKey(QueueEntry entry)
{
- ServerMessage message = entry.getMessage();
+ ServerMessage<?> message = entry.getMessage();
AMQMessageHeader messageHeader = message == null ? null : message.getMessageHeader();
Object groupVal = messageHeader == null ? _defaultGroup : messageHeader.getHeader(_groupId);
if(groupVal == null)
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/MessageGroupManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/MessageGroupManager.java
index 8ce4ce3344..4169c704f5 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/MessageGroupManager.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/MessageGroupManager.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.subscription;
+import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueEntry;
public interface MessageGroupManager
@@ -35,7 +36,7 @@ public interface MessageGroupManager
boolean acceptMessage(Subscription sub, QueueEntry entry);
- QueueEntry findEarliestAssignedAvailableEntry(Subscription sub);
+ QueueEntry findEarliestAssignedAvailableEntry(Subscription sub, AMQQueue queue);
void clearAssignments(Subscription sub);
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MessageGroupQueueTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MessageGroupQueueTest.java
index cb8ced4ddb..5af8e5a863 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MessageGroupQueueTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MessageGroupQueueTest.java
@@ -227,6 +227,7 @@ public class MessageGroupQueueTest extends QpidBrokerTestCase
producerSession.close();
producerConnection.close();
+ //sessions with a prefetch of 1
Session cs1 = ((AMQConnection)consumerConnection).createSession(true, Session.SESSION_TRANSACTED,1);
Session cs2 = ((AMQConnection)consumerConnection).createSession(true, Session.SESSION_TRANSACTED,1);