summaryrefslogtreecommitdiff
path: root/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java')
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java65
1 files changed, 43 insertions, 22 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java
index 7469e95394..a98a4ac144 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java
@@ -32,23 +32,38 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
-public class ConflationQueueList extends SimpleQueueEntryList
+public class ConflationQueueList extends OrderedQueueEntryList<ConflationQueueList.ConflationQueueEntry, ConflationQueue, ConflationQueueList>
{
private static final Logger LOGGER = LoggerFactory.getLogger(ConflationQueueList.class);
+ private static final HeadCreator<ConflationQueueList.ConflationQueueEntry, ConflationQueue, ConflationQueueList> HEAD_CREATOR = new HeadCreator<ConflationQueueList.ConflationQueueEntry, ConflationQueue, ConflationQueueList>()
+ {
+
+ @Override
+ public ConflationQueueEntry createHead(final ConflationQueueList list)
+ {
+ return list.createHead();
+ }
+ };
+
private final String _conflationKey;
- private final ConcurrentHashMap<Object, AtomicReference<QueueEntry>> _latestValuesMap =
- new ConcurrentHashMap<Object, AtomicReference<QueueEntry>>();
+ private final ConcurrentHashMap<Object, AtomicReference<ConflationQueueEntry>> _latestValuesMap =
+ new ConcurrentHashMap<Object, AtomicReference<ConflationQueueEntry>>();
- private final QueueEntry _deleteInProgress = new SimpleQueueEntryImpl(this);
- private final QueueEntry _newerEntryAlreadyBeenAndGone = new SimpleQueueEntryImpl(this);
+ private final ConflationQueueEntry _deleteInProgress = new ConflationQueueEntry(this);
+ private final ConflationQueueEntry _newerEntryAlreadyBeenAndGone = new ConflationQueueEntry(this);
- public ConflationQueueList(AMQQueue<QueueConsumer> queue, String conflationKey)
+ public ConflationQueueList(ConflationQueue queue, String conflationKey)
{
- super(queue);
+ super(queue, HEAD_CREATOR);
_conflationKey = conflationKey;
}
+ private ConflationQueueEntry createHead()
+ {
+ return new ConflationQueueEntry(this);
+ }
+
public String getConflationKey()
{
return _conflationKey;
@@ -66,7 +81,7 @@ public class ConflationQueueList extends SimpleQueueEntryList
@Override
public ConflationQueueEntry add(final ServerMessage message)
{
- final ConflationQueueEntry addedEntry = (ConflationQueueEntry) (super.add(message));
+ final ConflationQueueEntry addedEntry = super.add(message);
final Object keyValue = message.getMessageHeader().getHeader(_conflationKey);
if (keyValue != null)
@@ -76,14 +91,14 @@ public class ConflationQueueList extends SimpleQueueEntryList
LOGGER.debug("Adding entry " + addedEntry + " for message " + message.getMessageNumber() + " with conflation key " + keyValue);
}
- final AtomicReference<QueueEntry> referenceToEntry = new AtomicReference<QueueEntry>(addedEntry);
- AtomicReference<QueueEntry> entryReferenceFromMap = null;
- QueueEntry entryFromMap;
+ final AtomicReference<ConflationQueueEntry> referenceToEntry = new AtomicReference<ConflationQueueEntry>(addedEntry);
+ AtomicReference<ConflationQueueEntry> entryReferenceFromMap;
+ ConflationQueueEntry entryFromMap;
// Iterate until we have got a valid atomic reference object and either the referent is newer than the current
// entry, or the current entry has replaced it in the reference. Note that the _deletedEntryPlaceholder is a special value
// indicating that the reference object is no longer valid (it is being removed from the map).
- boolean keepTryingToUpdateEntryReference = true;
+ boolean keepTryingToUpdateEntryReference;
do
{
do
@@ -139,16 +154,16 @@ public class ConflationQueueList extends SimpleQueueEntryList
* adds and removes during execution of this method.</li>
* </ul>
*/
- private AtomicReference<QueueEntry> getOrPutIfAbsent(final Object key, final AtomicReference<QueueEntry> referenceToAddedValue)
+ private AtomicReference<ConflationQueueEntry> getOrPutIfAbsent(final Object key, final AtomicReference<ConflationQueueEntry> referenceToAddedValue)
{
- AtomicReference<QueueEntry> latestValueReference = _latestValuesMap.putIfAbsent(key, referenceToAddedValue);
+ AtomicReference<ConflationQueueEntry> latestValueReference = _latestValuesMap.putIfAbsent(key, referenceToAddedValue);
if(latestValueReference == null)
{
latestValueReference = _latestValuesMap.get(key);
if(latestValueReference == null)
{
- return new AtomicReference<QueueEntry>(_newerEntryAlreadyBeenAndGone);
+ return new AtomicReference<ConflationQueueEntry>(_newerEntryAlreadyBeenAndGone);
}
}
return latestValueReference;
@@ -177,12 +192,17 @@ public class ConflationQueueList extends SimpleQueueEntryList
}
}
- private final class ConflationQueueEntry extends SimpleQueueEntryImpl
+ final class ConflationQueueEntry extends OrderedQueueEntry<ConflationQueueEntry, ConflationQueue, ConflationQueueList>
{
- private AtomicReference<QueueEntry> _latestValueReference;
+ private AtomicReference<ConflationQueueEntry> _latestValueReference;
+
+ private ConflationQueueEntry(final ConflationQueueList queueEntryList)
+ {
+ super(queueEntryList);
+ }
- public ConflationQueueEntry(SimpleQueueEntryList queueEntryList, ServerMessage message)
+ public ConflationQueueEntry(ConflationQueueList queueEntryList, ServerMessage message)
{
super(queueEntryList, message);
}
@@ -206,7 +226,7 @@ public class ConflationQueueList extends SimpleQueueEntryList
}
- public void setLatestValueReference(final AtomicReference<QueueEntry> latestValueReference)
+ public void setLatestValueReference(final AtomicReference<ConflationQueueEntry> latestValueReference)
{
_latestValueReference = latestValueReference;
}
@@ -227,12 +247,12 @@ public class ConflationQueueList extends SimpleQueueEntryList
/**
* Exposed purposes of unit test only.
*/
- Map<Object, AtomicReference<QueueEntry>> getLatestValuesMap()
+ Map<Object, AtomicReference<ConflationQueueEntry>> getLatestValuesMap()
{
return Collections.unmodifiableMap(_latestValuesMap);
}
- static class Factory implements QueueEntryListFactory
+ static class Factory implements QueueEntryListFactory<ConflationQueueList.ConflationQueueEntry, ConflationQueue, ConflationQueueList>
{
private final String _conflationKey;
@@ -241,7 +261,8 @@ public class ConflationQueueList extends SimpleQueueEntryList
_conflationKey = conflationKey;
}
- public ConflationQueueList createQueueEntryList(AMQQueue queue)
+ @Override
+ public ConflationQueueList createQueueEntryList(final ConflationQueue queue)
{
return new ConflationQueueList(queue, _conflationKey);
}