diff options
author | Robert Gemmell <robbie@apache.org> | 2014-01-20 21:33:58 +0000 |
---|---|---|
committer | Robert Gemmell <robbie@apache.org> | 2014-01-20 21:33:58 +0000 |
commit | 35d78414778d9856d0e441416cb2f7e737ab7627 (patch) | |
tree | e51ddc9d24880beefe2b7fe40933a41d7d0dbdcb | |
parent | defa7dc1fa2e5771ee1df636c9421ed66fb67763 (diff) | |
download | qpid-python-QPID-5496_default_groups.tar.gz |
QPID-5496: WIP on a common way of configuring the default-group for a queue, and allowing queues with 'shared groups' to support non-grouped messagesQPID-5496_default_groups
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-5496_default_groups@1559833 13f79535-47bb-0310-9956-ffa450edef68
5 files changed, 109 insertions, 39 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index b002419064..567fc02675 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -71,6 +71,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes public static final String SHARED_MSG_GROUP_ARG_VALUE = "1"; private static final String QPID_NO_GROUP = "qpid.no-group"; private static final String DEFAULT_SHARED_MESSAGE_GROUP = System.getProperty(BrokerProperties.PROPERTY_DEFAULT_SHARED_MESSAGE_GROUP, QPID_NO_GROUP); + private static final String QPID_NO_DEFAULT_GROUP = "qpid.no-default-group"; // TODO - should make this configurable at the vhost / broker level private static final int DEFAULT_MAX_GROUPS = 255; @@ -246,19 +247,31 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes if(arguments != null && arguments.containsKey(Queue.MESSAGE_GROUP_KEY)) { - if(arguments.get(Queue.MESSAGE_GROUP_SHARED_GROUPS) != null - && (Boolean)(arguments.get(Queue.MESSAGE_GROUP_SHARED_GROUPS))) + String messageGroupKey = String.valueOf(arguments.get(Queue.MESSAGE_GROUP_KEY)); + boolean requestedSharedGroups = arguments.get(Queue.MESSAGE_GROUP_SHARED_GROUPS) != null && (Boolean)(arguments.get(Queue.MESSAGE_GROUP_SHARED_GROUPS)); + + //Determine the default group value + String defaultGroup = requestedSharedGroups ? DEFAULT_SHARED_MESSAGE_GROUP : null; + if(arguments.containsKey(Queue.MESSAGE_GROUP_DEFAULT_GROUP)) + { + Object defaultGroupArg = arguments.get(Queue.MESSAGE_GROUP_DEFAULT_GROUP); + + defaultGroup = defaultGroupArg == null ? null : defaultGroupArg.toString(); + } + + //Remove the default group if requested by the configured value + if(QPID_NO_DEFAULT_GROUP.equals(defaultGroup)) + { + defaultGroup = null; + } + + if(requestedSharedGroups) { - Object defaultGroup = arguments.get(Queue.MESSAGE_GROUP_DEFAULT_GROUP); - _messageGroupManager = - new DefinedGroupMessageGroupManager(String.valueOf(arguments.get(Queue.MESSAGE_GROUP_KEY)), - defaultGroup == null ? DEFAULT_SHARED_MESSAGE_GROUP : defaultGroup.toString(), - this); + _messageGroupManager = new DefinedGroupMessageGroupManager(messageGroupKey, defaultGroup, this); } else { - _messageGroupManager = new AssignedSubscriptionMessageGroupManager(String.valueOf(arguments.get( - Queue.MESSAGE_GROUP_KEY)), DEFAULT_MAX_GROUPS); + _messageGroupManager = new AssignedSubscriptionMessageGroupManager(messageGroupKey, defaultGroup, DEFAULT_MAX_GROUPS); } } else @@ -541,7 +554,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes public void resetSubPointersForGroups(Subscription subscription, boolean clearAssignments) { - QueueEntry entry = _messageGroupManager.findEarliestAssignedAvailableEntry(subscription); + QueueEntry entry = _messageGroupManager.findEarliestAssignedAvailableEntry(subscription, this); if(clearAssignments) { _messageGroupManager.clearAssignments(subscription); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/AssignedSubscriptionMessageGroupManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/AssignedSubscriptionMessageGroupManager.java index ae7e11afa4..03d05e4b5d 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/AssignedSubscriptionMessageGroupManager.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/AssignedSubscriptionMessageGroupManager.java @@ -20,10 +20,12 @@ */ package org.apache.qpid.server.subscription; +import org.apache.qpid.server.message.AMQMessageHeader; +import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueEntryVisitor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.apache.qpid.server.queue.QueueEntry; import java.util.Iterator; @@ -36,12 +38,14 @@ public class AssignedSubscriptionMessageGroupManager implements MessageGroupMana private final String _groupId; + private String _defaultGroup; private final ConcurrentHashMap<Integer, Subscription> _groupMap = new ConcurrentHashMap<Integer, Subscription>(); private final int _groupMask; - public AssignedSubscriptionMessageGroupManager(final String groupId, final int maxGroups) + public AssignedSubscriptionMessageGroupManager(final String groupId, String defaultGroup, final int maxGroups) { _groupId = groupId; + _defaultGroup = defaultGroup; _groupMask = pow2(maxGroups)-1; } @@ -55,9 +59,21 @@ public class AssignedSubscriptionMessageGroupManager implements MessageGroupMana return val; } + private Object getKey(QueueEntry entry) + { + ServerMessage<?> message = entry.getMessage(); + AMQMessageHeader messageHeader = message == null ? null : message.getMessageHeader(); + Object groupVal = messageHeader == null ? _defaultGroup : messageHeader.getHeader(_groupId); + if(groupVal == null) + { + groupVal = _defaultGroup; + } + return groupVal; + } + public Subscription getAssignedSubscription(final QueueEntry entry) { - Object groupVal = entry.getMessage().getMessageHeader().getHeader(_groupId); + Object groupVal = getKey(entry); return groupVal == null ? null : _groupMap.get(groupVal.hashCode() & _groupMask); } @@ -75,7 +91,7 @@ public class AssignedSubscriptionMessageGroupManager implements MessageGroupMana private boolean assignMessage(Subscription sub, QueueEntry entry) { - Object groupVal = entry.getMessage().getMessageHeader().getHeader(_groupId); + Object groupVal = getKey(entry); if(groupVal == null) { return true; @@ -107,10 +123,10 @@ public class AssignedSubscriptionMessageGroupManager implements MessageGroupMana } } - public QueueEntry findEarliestAssignedAvailableEntry(Subscription sub) + public QueueEntry findEarliestAssignedAvailableEntry(Subscription sub, AMQQueue queue) { EntryFinder visitor = new EntryFinder(sub); - sub.getQueue().visit(visitor); + queue.visit(visitor); return visitor.getEntry(); } @@ -131,21 +147,25 @@ public class AssignedSubscriptionMessageGroupManager implements MessageGroupMana return false; } - Object groupId = entry.getMessage().getMessageHeader().getHeader(_groupId); + Object groupId = getKey(entry); if(groupId == null) { - return false; + //message is not part of a group, anyone who wants it can consume it + _entry = entry; + return true; } Integer group = groupId.hashCode() & _groupMask; Subscription assignedSub = _groupMap.get(group); - if(assignedSub == _sub) + if(assignedSub == _sub || assignedSub == null) { + //group is either not assigned or is assigned to this subscription _entry = entry; return true; } else { + //group is already assigned to another subscription return false; } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java index 55110c46de..b33705e50c 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java @@ -20,10 +20,10 @@ */ package org.apache.qpid.server.subscription; +import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueEntryVisitor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.queue.QueueEntry; @@ -126,29 +126,54 @@ public class DefinedGroupMessageGroupManager implements MessageGroupManager _resetHelper = resetHelper; } - public synchronized Subscription getAssignedSubscription(final QueueEntry entry) + public Subscription getAssignedSubscription(final QueueEntry entry) { Object groupId = getKey(entry); - Group group = _groupMap.get(groupId); - return group == null || !group.isValid() ? null : group.getSubscription(); + if(groupId == null) + { + return null; + } + else + { + synchronized (this) + { + Group group = _groupMap.get(groupId); + return group == null || !group.isValid() ? null : group.getSubscription(); + } + } } - public synchronized boolean acceptMessage(final Subscription sub, final QueueEntry entry) + public boolean acceptMessage(final Subscription sub, final QueueEntry entry) { - if(assignMessage(sub, entry)) + Object groupId = getKey(entry); + if(groupId == null) { return entry.acquire(sub); } else { - return false; + synchronized (this) + { + if(assignMessage(sub, entry, groupId)) + { + return entry.acquire(sub); + } + else + { + return false; + } + } } } - private boolean assignMessage(final Subscription sub, final QueueEntry entry) + private boolean assignMessage(final Subscription sub, final QueueEntry entry, Object groupId) { - Object groupId = getKey(entry); + if(groupId == null) + { + return true; + } + Group group = _groupMap.get(groupId); if(group == null || !group.isValid()) @@ -157,7 +182,7 @@ public class DefinedGroupMessageGroupManager implements MessageGroupManager _groupMap.put(groupId, group); - // there's a small change that the group became empty between the point at which getNextAvailable() was + // there's a small chance that the group became empty between the point at which SAMQQ#getNextAvailableEntry() was // called on the subscription, and when accept message is called... in that case we want to avoid delivering // out of order if(_resetHelper.isEntryAheadOfSubscription(entry, sub)) @@ -179,10 +204,10 @@ public class DefinedGroupMessageGroupManager implements MessageGroupManager } } - public synchronized QueueEntry findEarliestAssignedAvailableEntry(final Subscription sub) + public synchronized QueueEntry findEarliestAssignedAvailableEntry(final Subscription sub, AMQQueue queue) { EntryFinder visitor = new EntryFinder(sub); - sub.getQueue().visit(visitor); + queue.visit(visitor); return visitor.getEntry(); } @@ -204,16 +229,26 @@ public class DefinedGroupMessageGroupManager implements MessageGroupManager } Object groupId = getKey(entry); - - Group group = _groupMap.get(groupId); - if(group != null && group.getSubscription() == _sub) + if(groupId != null) { - _entry = entry; - return true; + Group group = _groupMap.get(groupId); + if(group == null || group.getSubscription() == null || group.getSubscription() == _sub) + { + //group is either not assigned or is assigned to this subscription + _entry = entry; + return true; + } + else + { + //group is already assigned to another subscription + return false; + } } else { - return false; + //message is not part of a group, anyone who wants it can consume it + _entry = entry; + return true; } } @@ -230,7 +265,7 @@ public class DefinedGroupMessageGroupManager implements MessageGroupManager private Object getKey(QueueEntry entry) { - ServerMessage message = entry.getMessage(); + ServerMessage<?> message = entry.getMessage(); AMQMessageHeader messageHeader = message == null ? null : message.getMessageHeader(); Object groupVal = messageHeader == null ? _defaultGroup : messageHeader.getHeader(_groupId); if(groupVal == null) diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/MessageGroupManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/MessageGroupManager.java index 8ce4ce3344..4169c704f5 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/MessageGroupManager.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/MessageGroupManager.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.server.subscription; +import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueEntry; public interface MessageGroupManager @@ -35,7 +36,7 @@ public interface MessageGroupManager boolean acceptMessage(Subscription sub, QueueEntry entry); - QueueEntry findEarliestAssignedAvailableEntry(Subscription sub); + QueueEntry findEarliestAssignedAvailableEntry(Subscription sub, AMQQueue queue); void clearAssignments(Subscription sub); } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MessageGroupQueueTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MessageGroupQueueTest.java index cb8ced4ddb..5af8e5a863 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MessageGroupQueueTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MessageGroupQueueTest.java @@ -227,6 +227,7 @@ public class MessageGroupQueueTest extends QpidBrokerTestCase producerSession.close(); producerConnection.close(); + //sessions with a prefetch of 1 Session cs1 = ((AMQConnection)consumerConnection).createSession(true, Session.SESSION_TRANSACTED,1); Session cs2 = ((AMQConnection)consumerConnection).createSession(true, Session.SESSION_TRANSACTED,1); |