diff options
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java')
-rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java | 147 |
1 files changed, 118 insertions, 29 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java index 6a2e4f155d..53420ded9b 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java @@ -24,17 +24,25 @@ package org.apache.qpid.server.queue; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.txn.ServerTransaction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.util.Collections; +import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicReference; public class ConflationQueueList extends SimpleQueueEntryList { + private static final Logger LOGGER = LoggerFactory.getLogger(ConflationQueueList.class); private final String _conflationKey; private final ConcurrentHashMap<Object, AtomicReference<QueueEntry>> _latestValuesMap = new ConcurrentHashMap<Object, AtomicReference<QueueEntry>>(); + private final QueueEntry _deleteInProgress = new SimpleQueueEntryImpl(this); + private final QueueEntry _newerEntryAlreadyBeenAndGone = new SimpleQueueEntryImpl(this); + public ConflationQueueList(AMQQueue queue, String conflationKey) { super(queue); @@ -52,48 +60,98 @@ public class ConflationQueueList extends SimpleQueueEntryList return new ConflationQueueEntry(this, message); } - + /** + * Updates the list using super.add and also updates {@link #_latestValuesMap} and discards entries as necessary. + */ @Override public ConflationQueueEntry add(final ServerMessage message) { - ConflationQueueEntry entry = (ConflationQueueEntry) (super.add(message)); - AtomicReference<QueueEntry> latestValueReference = null; + final ConflationQueueEntry addedEntry = (ConflationQueueEntry) (super.add(message)); - Object value = message.getMessageHeader().getHeader(_conflationKey); - if(value != null) + final Object keyValue = message.getMessageHeader().getHeader(_conflationKey); + if (keyValue != null) { - latestValueReference = _latestValuesMap.get(value); - if(latestValueReference == null) + if(LOGGER.isDebugEnabled()) { - _latestValuesMap.putIfAbsent(value, new AtomicReference<QueueEntry>(entry)); - latestValueReference = _latestValuesMap.get(value); + LOGGER.debug("Adding entry " + addedEntry + " for message " + message.getMessageNumber() + " with conflation key " + keyValue); } - QueueEntry oldEntry; + final AtomicReference<QueueEntry> referenceToEntry = new AtomicReference<QueueEntry>(addedEntry); + AtomicReference<QueueEntry> entryReferenceFromMap = null; + QueueEntry entryFromMap; + + // Iterate until we have got a valid atomic reference object and either the referent is newer than the current + // entry, or the current entry has replaced it in the reference. Note that the _deletedEntryPlaceholder is a special value + // indicating that the reference object is no longer valid (it is being removed from the map). + boolean keepTryingToUpdateEntryReference = true; do { - oldEntry = latestValueReference.get(); + do + { + entryReferenceFromMap = getOrPutIfAbsent(keyValue, referenceToEntry); + + // entryFromMap can be either an older entry, a newer entry (added recently by another thread), or addedEntry (if it's for a new key value) + entryFromMap = entryReferenceFromMap.get(); + } + while(entryFromMap == _deleteInProgress); + + boolean entryFromMapIsOlder = entryFromMap != _newerEntryAlreadyBeenAndGone && entryFromMap.compareTo(addedEntry) < 0; + + keepTryingToUpdateEntryReference = entryFromMapIsOlder + && !entryReferenceFromMap.compareAndSet(entryFromMap, addedEntry); } - while(oldEntry.compareTo(entry) < 0 && !latestValueReference.compareAndSet(oldEntry, entry)); + while(keepTryingToUpdateEntryReference); - if(oldEntry.compareTo(entry) < 0) + if (entryFromMap == _newerEntryAlreadyBeenAndGone) + { + discardEntry(addedEntry); + } + else if (entryFromMap.compareTo(addedEntry) > 0) { - // We replaced some other entry to become the newest value - if(oldEntry.acquire()) + if(LOGGER.isDebugEnabled()) { - discardEntry(oldEntry); + LOGGER.debug("New entry " + addedEntry.getEntryId() + " for message " + addedEntry.getMessage().getMessageNumber() + " being immediately discarded because a newer entry arrived. The newer entry is: " + entryFromMap + " for message " + entryFromMap.getMessage().getMessageNumber()); } + discardEntry(addedEntry); } - else if (oldEntry.compareTo(entry) > 0) + else if (entryFromMap.compareTo(addedEntry) < 0) { - // A newer entry came along - discardEntry(entry); - + if(LOGGER.isDebugEnabled()) + { + LOGGER.debug("Entry " + addedEntry + " for message " + addedEntry.getMessage().getMessageNumber() + " replacing older entry " + entryFromMap + " for message " + entryFromMap.getMessage().getMessageNumber()); + } + discardEntry(entryFromMap); } + + addedEntry.setLatestValueReference(entryReferenceFromMap); } - entry.setLatestValueReference(latestValueReference); - return entry; + return addedEntry; + } + + /** + * Returns: + * + * <ul> + * <li>the existing entry reference if the value already exists in the map, or</li> + * <li>referenceToValue if none exists, or</li> + * <li>a reference to {@link #_newerEntryAlreadyBeenAndGone} if another thread concurrently + * adds and removes during execution of this method.</li> + * </ul> + */ + private AtomicReference<QueueEntry> getOrPutIfAbsent(final Object key, final AtomicReference<QueueEntry> referenceToAddedValue) + { + AtomicReference<QueueEntry> latestValueReference = _latestValuesMap.putIfAbsent(key, referenceToAddedValue); + + if(latestValueReference == null) + { + latestValueReference = _latestValuesMap.get(key); + if(latestValueReference == null) + { + return new AtomicReference<QueueEntry>(_newerEntryAlreadyBeenAndGone); + } + } + return latestValueReference; } private void discardEntry(final QueueEntry entry) @@ -104,11 +162,13 @@ public class ConflationQueueList extends SimpleQueueEntryList txn.dequeue(entry.getQueue(),entry.getMessage(), new ServerTransaction.Action() { + @Override public void postCommit() { entry.discard(); } + @Override public void onRollback() { @@ -120,7 +180,6 @@ public class ConflationQueueList extends SimpleQueueEntryList private final class ConflationQueueEntry extends SimpleQueueEntryImpl { - private AtomicReference<QueueEntry> _latestValueReference; public ConflationQueueEntry(SimpleQueueEntryList queueEntryList, ServerMessage message) @@ -128,25 +187,56 @@ public class ConflationQueueList extends SimpleQueueEntryList super(queueEntryList, message); } - + @Override public void release() { super.release(); - if(_latestValueReference != null) + discardIfReleasedEntryIsNoLongerLatest(); + } + + @Override + public boolean delete() + { + if(super.delete()) { - if(_latestValueReference.get() != this) + if(_latestValueReference != null && _latestValueReference.compareAndSet(this, _deleteInProgress)) { - discardEntry(this); + Object key = getMessageHeader().getHeader(_conflationKey); + _latestValuesMap.remove(key,_latestValueReference); } + return true; + } + else + { + return false; } - } public void setLatestValueReference(final AtomicReference<QueueEntry> latestValueReference) { _latestValueReference = latestValueReference; } + + private void discardIfReleasedEntryIsNoLongerLatest() + { + if(_latestValueReference != null) + { + if(_latestValueReference.get() != this) + { + discardEntry(this); + } + } + } + + } + + /** + * Exposed purposes of unit test only. + */ + Map<Object, AtomicReference<QueueEntry>> getLatestValuesMap() + { + return Collections.unmodifiableMap(_latestValuesMap); } static class Factory implements QueueEntryListFactory @@ -163,5 +253,4 @@ public class ConflationQueueList extends SimpleQueueEntryList return new ConflationQueueList(queue, _conflationKey); } } - } |