diff options
Diffstat (limited to 'qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java')
-rw-r--r-- | qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java | 48 |
1 files changed, 20 insertions, 28 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java index 4c74e5ba0b..e67591ae07 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java @@ -32,22 +32,22 @@ import org.apache.qpid.server.message.ServerMessage; import java.util.HashMap; import java.util.Map; -public class DefinedGroupMessageGroupManager implements MessageGroupManager +public class DefinedGroupMessageGroupManager<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleAMQQueue<E,Q,L>, L extends SimpleQueueEntryList<E,Q,L>> implements MessageGroupManager<E,Q,L> { private static final Logger _logger = LoggerFactory.getLogger(DefinedGroupMessageGroupManager.class); private final String _groupId; private final String _defaultGroup; private final Map<Object, Group> _groupMap = new HashMap<Object, Group>(); - private final ConsumerResetHelper _resetHelper; + private final ConsumerResetHelper<E,Q,L> _resetHelper; private final class Group { private final Object _group; - private QueueConsumer _consumer; + private QueueConsumer<?,E,Q,L> _consumer; private int _activeCount; - private Group(final Object key, final QueueConsumer consumer) + private Group(final Object key, final QueueConsumer<?,E,Q,L> consumer) { _group = key; _consumer = consumer; @@ -104,7 +104,7 @@ public class DefinedGroupMessageGroupManager implements MessageGroupManager return !(_consumer == null || (_activeCount == 0 && _consumer.isClosed())); } - public QueueConsumer getConsumer() + public QueueConsumer<?,E,Q,L> getConsumer() { return _consumer; } @@ -120,14 +120,14 @@ public class DefinedGroupMessageGroupManager implements MessageGroupManager } } - public DefinedGroupMessageGroupManager(final String groupId, String defaultGroup, ConsumerResetHelper resetHelper) + public DefinedGroupMessageGroupManager(final String groupId, String defaultGroup, ConsumerResetHelper<E,Q,L> resetHelper) { _groupId = groupId; _defaultGroup = defaultGroup; _resetHelper = resetHelper; } - public synchronized QueueConsumer getAssignedConsumer(final QueueEntry entry) + public synchronized QueueConsumer<?,E,Q,L> getAssignedConsumer(final E entry) { Object groupId = getKey(entry); @@ -135,19 +135,12 @@ public class DefinedGroupMessageGroupManager implements MessageGroupManager return group == null || !group.isValid() ? null : group.getConsumer(); } - public synchronized boolean acceptMessage(final QueueConsumer sub, final QueueEntry entry) + public synchronized boolean acceptMessage(final QueueConsumer<?,E,Q,L> sub, final E entry) { - if(assignMessage(sub, entry)) - { - return entry.acquire(sub); - } - else - { - return false; - } + return assignMessage(sub, entry) && entry.acquire(sub); } - private boolean assignMessage(final QueueConsumer sub, final QueueEntry entry) + private boolean assignMessage(final QueueConsumer<?,E,Q,L> sub, final E entry) { Object groupId = getKey(entry); Group group = _groupMap.get(groupId); @@ -171,7 +164,7 @@ public class DefinedGroupMessageGroupManager implements MessageGroupManager if(assignedSub == sub) { - entry.addStateChangeListener(new GroupStateChangeListener(group, entry)); + entry.addStateChangeListener(new GroupStateChangeListener(group)); return true; } else @@ -180,16 +173,16 @@ public class DefinedGroupMessageGroupManager implements MessageGroupManager } } - public synchronized QueueEntry findEarliestAssignedAvailableEntry(final QueueConsumer sub) + public synchronized E findEarliestAssignedAvailableEntry(final QueueConsumer<?,E,Q,L> sub) { EntryFinder visitor = new EntryFinder(sub); sub.getQueue().visit(visitor); return visitor.getEntry(); } - private class EntryFinder implements QueueEntryVisitor + private class EntryFinder implements QueueEntryVisitor<E> { - private QueueEntry _entry; + private E _entry; private QueueConsumer _sub; public EntryFinder(final QueueConsumer sub) @@ -197,7 +190,7 @@ public class DefinedGroupMessageGroupManager implements MessageGroupManager _sub = sub; } - public boolean visit(final QueueEntry entry) + public boolean visit(final E entry) { if(!entry.isAvailable()) { @@ -218,7 +211,7 @@ public class DefinedGroupMessageGroupManager implements MessageGroupManager } } - public QueueEntry getEntry() + public E getEntry() { return _entry; } @@ -229,7 +222,7 @@ public class DefinedGroupMessageGroupManager implements MessageGroupManager { } - private Object getKey(QueueEntry entry) + private Object getKey(E entry) { ServerMessage message = entry.getMessage(); AMQMessageHeader messageHeader = message == null ? null : message.getMessageHeader(); @@ -241,17 +234,16 @@ public class DefinedGroupMessageGroupManager implements MessageGroupManager return groupVal; } - private class GroupStateChangeListener implements StateChangeListener<MessageInstance<QueueConsumer>, QueueEntry.State> + private class GroupStateChangeListener implements StateChangeListener<MessageInstance<?, ? extends QueueConsumer>, QueueEntry.State> { private final Group _group; - public GroupStateChangeListener(final Group group, - final MessageInstance<QueueConsumer> entry) + public GroupStateChangeListener(final Group group) { _group = group; } - public void stateChanged(final MessageInstance<QueueConsumer> entry, + public void stateChanged(final MessageInstance<?, ? extends QueueConsumer> entry, final MessageInstance.State oldState, final MessageInstance.State newState) { |