summaryrefslogtreecommitdiff
path: root/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java')
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java48
1 files changed, 20 insertions, 28 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java
index 4c74e5ba0b..e67591ae07 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java
@@ -32,22 +32,22 @@ import org.apache.qpid.server.message.ServerMessage;
import java.util.HashMap;
import java.util.Map;
-public class DefinedGroupMessageGroupManager implements MessageGroupManager
+public class DefinedGroupMessageGroupManager<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleAMQQueue<E,Q,L>, L extends SimpleQueueEntryList<E,Q,L>> implements MessageGroupManager<E,Q,L>
{
private static final Logger _logger = LoggerFactory.getLogger(DefinedGroupMessageGroupManager.class);
private final String _groupId;
private final String _defaultGroup;
private final Map<Object, Group> _groupMap = new HashMap<Object, Group>();
- private final ConsumerResetHelper _resetHelper;
+ private final ConsumerResetHelper<E,Q,L> _resetHelper;
private final class Group
{
private final Object _group;
- private QueueConsumer _consumer;
+ private QueueConsumer<?,E,Q,L> _consumer;
private int _activeCount;
- private Group(final Object key, final QueueConsumer consumer)
+ private Group(final Object key, final QueueConsumer<?,E,Q,L> consumer)
{
_group = key;
_consumer = consumer;
@@ -104,7 +104,7 @@ public class DefinedGroupMessageGroupManager implements MessageGroupManager
return !(_consumer == null || (_activeCount == 0 && _consumer.isClosed()));
}
- public QueueConsumer getConsumer()
+ public QueueConsumer<?,E,Q,L> getConsumer()
{
return _consumer;
}
@@ -120,14 +120,14 @@ public class DefinedGroupMessageGroupManager implements MessageGroupManager
}
}
- public DefinedGroupMessageGroupManager(final String groupId, String defaultGroup, ConsumerResetHelper resetHelper)
+ public DefinedGroupMessageGroupManager(final String groupId, String defaultGroup, ConsumerResetHelper<E,Q,L> resetHelper)
{
_groupId = groupId;
_defaultGroup = defaultGroup;
_resetHelper = resetHelper;
}
- public synchronized QueueConsumer getAssignedConsumer(final QueueEntry entry)
+ public synchronized QueueConsumer<?,E,Q,L> getAssignedConsumer(final E entry)
{
Object groupId = getKey(entry);
@@ -135,19 +135,12 @@ public class DefinedGroupMessageGroupManager implements MessageGroupManager
return group == null || !group.isValid() ? null : group.getConsumer();
}
- public synchronized boolean acceptMessage(final QueueConsumer sub, final QueueEntry entry)
+ public synchronized boolean acceptMessage(final QueueConsumer<?,E,Q,L> sub, final E entry)
{
- if(assignMessage(sub, entry))
- {
- return entry.acquire(sub);
- }
- else
- {
- return false;
- }
+ return assignMessage(sub, entry) && entry.acquire(sub);
}
- private boolean assignMessage(final QueueConsumer sub, final QueueEntry entry)
+ private boolean assignMessage(final QueueConsumer<?,E,Q,L> sub, final E entry)
{
Object groupId = getKey(entry);
Group group = _groupMap.get(groupId);
@@ -171,7 +164,7 @@ public class DefinedGroupMessageGroupManager implements MessageGroupManager
if(assignedSub == sub)
{
- entry.addStateChangeListener(new GroupStateChangeListener(group, entry));
+ entry.addStateChangeListener(new GroupStateChangeListener(group));
return true;
}
else
@@ -180,16 +173,16 @@ public class DefinedGroupMessageGroupManager implements MessageGroupManager
}
}
- public synchronized QueueEntry findEarliestAssignedAvailableEntry(final QueueConsumer sub)
+ public synchronized E findEarliestAssignedAvailableEntry(final QueueConsumer<?,E,Q,L> sub)
{
EntryFinder visitor = new EntryFinder(sub);
sub.getQueue().visit(visitor);
return visitor.getEntry();
}
- private class EntryFinder implements QueueEntryVisitor
+ private class EntryFinder implements QueueEntryVisitor<E>
{
- private QueueEntry _entry;
+ private E _entry;
private QueueConsumer _sub;
public EntryFinder(final QueueConsumer sub)
@@ -197,7 +190,7 @@ public class DefinedGroupMessageGroupManager implements MessageGroupManager
_sub = sub;
}
- public boolean visit(final QueueEntry entry)
+ public boolean visit(final E entry)
{
if(!entry.isAvailable())
{
@@ -218,7 +211,7 @@ public class DefinedGroupMessageGroupManager implements MessageGroupManager
}
}
- public QueueEntry getEntry()
+ public E getEntry()
{
return _entry;
}
@@ -229,7 +222,7 @@ public class DefinedGroupMessageGroupManager implements MessageGroupManager
{
}
- private Object getKey(QueueEntry entry)
+ private Object getKey(E entry)
{
ServerMessage message = entry.getMessage();
AMQMessageHeader messageHeader = message == null ? null : message.getMessageHeader();
@@ -241,17 +234,16 @@ public class DefinedGroupMessageGroupManager implements MessageGroupManager
return groupVal;
}
- private class GroupStateChangeListener implements StateChangeListener<MessageInstance<QueueConsumer>, QueueEntry.State>
+ private class GroupStateChangeListener implements StateChangeListener<MessageInstance<?, ? extends QueueConsumer>, QueueEntry.State>
{
private final Group _group;
- public GroupStateChangeListener(final Group group,
- final MessageInstance<QueueConsumer> entry)
+ public GroupStateChangeListener(final Group group)
{
_group = group;
}
- public void stateChanged(final MessageInstance<QueueConsumer> entry,
+ public void stateChanged(final MessageInstance<?, ? extends QueueConsumer> entry,
final MessageInstance.State oldState,
final MessageInstance.State newState)
{