diff options
Diffstat (limited to 'qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java')
-rw-r--r-- | qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java | 65 |
1 files changed, 43 insertions, 22 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java index 7469e95394..a98a4ac144 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java @@ -32,23 +32,38 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicReference; -public class ConflationQueueList extends SimpleQueueEntryList +public class ConflationQueueList extends OrderedQueueEntryList<ConflationQueueList.ConflationQueueEntry, ConflationQueue, ConflationQueueList> { private static final Logger LOGGER = LoggerFactory.getLogger(ConflationQueueList.class); + private static final HeadCreator<ConflationQueueList.ConflationQueueEntry, ConflationQueue, ConflationQueueList> HEAD_CREATOR = new HeadCreator<ConflationQueueList.ConflationQueueEntry, ConflationQueue, ConflationQueueList>() + { + + @Override + public ConflationQueueEntry createHead(final ConflationQueueList list) + { + return list.createHead(); + } + }; + private final String _conflationKey; - private final ConcurrentHashMap<Object, AtomicReference<QueueEntry>> _latestValuesMap = - new ConcurrentHashMap<Object, AtomicReference<QueueEntry>>(); + private final ConcurrentHashMap<Object, AtomicReference<ConflationQueueEntry>> _latestValuesMap = + new ConcurrentHashMap<Object, AtomicReference<ConflationQueueEntry>>(); - private final QueueEntry _deleteInProgress = new SimpleQueueEntryImpl(this); - private final QueueEntry _newerEntryAlreadyBeenAndGone = new SimpleQueueEntryImpl(this); + private final ConflationQueueEntry _deleteInProgress = new ConflationQueueEntry(this); + private final ConflationQueueEntry _newerEntryAlreadyBeenAndGone = new ConflationQueueEntry(this); - public ConflationQueueList(AMQQueue<QueueConsumer> queue, String conflationKey) + public ConflationQueueList(ConflationQueue queue, String conflationKey) { - super(queue); + super(queue, HEAD_CREATOR); _conflationKey = conflationKey; } + private ConflationQueueEntry createHead() + { + return new ConflationQueueEntry(this); + } + public String getConflationKey() { return _conflationKey; @@ -66,7 +81,7 @@ public class ConflationQueueList extends SimpleQueueEntryList @Override public ConflationQueueEntry add(final ServerMessage message) { - final ConflationQueueEntry addedEntry = (ConflationQueueEntry) (super.add(message)); + final ConflationQueueEntry addedEntry = super.add(message); final Object keyValue = message.getMessageHeader().getHeader(_conflationKey); if (keyValue != null) @@ -76,14 +91,14 @@ public class ConflationQueueList extends SimpleQueueEntryList LOGGER.debug("Adding entry " + addedEntry + " for message " + message.getMessageNumber() + " with conflation key " + keyValue); } - final AtomicReference<QueueEntry> referenceToEntry = new AtomicReference<QueueEntry>(addedEntry); - AtomicReference<QueueEntry> entryReferenceFromMap = null; - QueueEntry entryFromMap; + final AtomicReference<ConflationQueueEntry> referenceToEntry = new AtomicReference<ConflationQueueEntry>(addedEntry); + AtomicReference<ConflationQueueEntry> entryReferenceFromMap; + ConflationQueueEntry entryFromMap; // Iterate until we have got a valid atomic reference object and either the referent is newer than the current // entry, or the current entry has replaced it in the reference. Note that the _deletedEntryPlaceholder is a special value // indicating that the reference object is no longer valid (it is being removed from the map). - boolean keepTryingToUpdateEntryReference = true; + boolean keepTryingToUpdateEntryReference; do { do @@ -139,16 +154,16 @@ public class ConflationQueueList extends SimpleQueueEntryList * adds and removes during execution of this method.</li> * </ul> */ - private AtomicReference<QueueEntry> getOrPutIfAbsent(final Object key, final AtomicReference<QueueEntry> referenceToAddedValue) + private AtomicReference<ConflationQueueEntry> getOrPutIfAbsent(final Object key, final AtomicReference<ConflationQueueEntry> referenceToAddedValue) { - AtomicReference<QueueEntry> latestValueReference = _latestValuesMap.putIfAbsent(key, referenceToAddedValue); + AtomicReference<ConflationQueueEntry> latestValueReference = _latestValuesMap.putIfAbsent(key, referenceToAddedValue); if(latestValueReference == null) { latestValueReference = _latestValuesMap.get(key); if(latestValueReference == null) { - return new AtomicReference<QueueEntry>(_newerEntryAlreadyBeenAndGone); + return new AtomicReference<ConflationQueueEntry>(_newerEntryAlreadyBeenAndGone); } } return latestValueReference; @@ -177,12 +192,17 @@ public class ConflationQueueList extends SimpleQueueEntryList } } - private final class ConflationQueueEntry extends SimpleQueueEntryImpl + final class ConflationQueueEntry extends OrderedQueueEntry<ConflationQueueEntry, ConflationQueue, ConflationQueueList> { - private AtomicReference<QueueEntry> _latestValueReference; + private AtomicReference<ConflationQueueEntry> _latestValueReference; + + private ConflationQueueEntry(final ConflationQueueList queueEntryList) + { + super(queueEntryList); + } - public ConflationQueueEntry(SimpleQueueEntryList queueEntryList, ServerMessage message) + public ConflationQueueEntry(ConflationQueueList queueEntryList, ServerMessage message) { super(queueEntryList, message); } @@ -206,7 +226,7 @@ public class ConflationQueueList extends SimpleQueueEntryList } - public void setLatestValueReference(final AtomicReference<QueueEntry> latestValueReference) + public void setLatestValueReference(final AtomicReference<ConflationQueueEntry> latestValueReference) { _latestValueReference = latestValueReference; } @@ -227,12 +247,12 @@ public class ConflationQueueList extends SimpleQueueEntryList /** * Exposed purposes of unit test only. */ - Map<Object, AtomicReference<QueueEntry>> getLatestValuesMap() + Map<Object, AtomicReference<ConflationQueueEntry>> getLatestValuesMap() { return Collections.unmodifiableMap(_latestValuesMap); } - static class Factory implements QueueEntryListFactory + static class Factory implements QueueEntryListFactory<ConflationQueueList.ConflationQueueEntry, ConflationQueue, ConflationQueueList> { private final String _conflationKey; @@ -241,7 +261,8 @@ public class ConflationQueueList extends SimpleQueueEntryList _conflationKey = conflationKey; } - public ConflationQueueList createQueueEntryList(AMQQueue queue) + @Override + public ConflationQueueList createQueueEntryList(final ConflationQueue queue) { return new ConflationQueueList(queue, _conflationKey); } |