summaryrefslogtreecommitdiff
path: root/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java147
1 files changed, 118 insertions, 29 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java
index 6a2e4f155d..53420ded9b 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java
@@ -24,17 +24,25 @@ package org.apache.qpid.server.queue;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.Collections;
+import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
public class ConflationQueueList extends SimpleQueueEntryList
{
+ private static final Logger LOGGER = LoggerFactory.getLogger(ConflationQueueList.class);
private final String _conflationKey;
private final ConcurrentHashMap<Object, AtomicReference<QueueEntry>> _latestValuesMap =
new ConcurrentHashMap<Object, AtomicReference<QueueEntry>>();
+ private final QueueEntry _deleteInProgress = new SimpleQueueEntryImpl(this);
+ private final QueueEntry _newerEntryAlreadyBeenAndGone = new SimpleQueueEntryImpl(this);
+
public ConflationQueueList(AMQQueue queue, String conflationKey)
{
super(queue);
@@ -52,48 +60,98 @@ public class ConflationQueueList extends SimpleQueueEntryList
return new ConflationQueueEntry(this, message);
}
-
+ /**
+ * Updates the list using super.add and also updates {@link #_latestValuesMap} and discards entries as necessary.
+ */
@Override
public ConflationQueueEntry add(final ServerMessage message)
{
- ConflationQueueEntry entry = (ConflationQueueEntry) (super.add(message));
- AtomicReference<QueueEntry> latestValueReference = null;
+ final ConflationQueueEntry addedEntry = (ConflationQueueEntry) (super.add(message));
- Object value = message.getMessageHeader().getHeader(_conflationKey);
- if(value != null)
+ final Object keyValue = message.getMessageHeader().getHeader(_conflationKey);
+ if (keyValue != null)
{
- latestValueReference = _latestValuesMap.get(value);
- if(latestValueReference == null)
+ if(LOGGER.isDebugEnabled())
{
- _latestValuesMap.putIfAbsent(value, new AtomicReference<QueueEntry>(entry));
- latestValueReference = _latestValuesMap.get(value);
+ LOGGER.debug("Adding entry " + addedEntry + " for message " + message.getMessageNumber() + " with conflation key " + keyValue);
}
- QueueEntry oldEntry;
+ final AtomicReference<QueueEntry> referenceToEntry = new AtomicReference<QueueEntry>(addedEntry);
+ AtomicReference<QueueEntry> entryReferenceFromMap = null;
+ QueueEntry entryFromMap;
+
+ // Iterate until we have got a valid atomic reference object and either the referent is newer than the current
+ // entry, or the current entry has replaced it in the reference. Note that the _deletedEntryPlaceholder is a special value
+ // indicating that the reference object is no longer valid (it is being removed from the map).
+ boolean keepTryingToUpdateEntryReference = true;
do
{
- oldEntry = latestValueReference.get();
+ do
+ {
+ entryReferenceFromMap = getOrPutIfAbsent(keyValue, referenceToEntry);
+
+ // entryFromMap can be either an older entry, a newer entry (added recently by another thread), or addedEntry (if it's for a new key value)
+ entryFromMap = entryReferenceFromMap.get();
+ }
+ while(entryFromMap == _deleteInProgress);
+
+ boolean entryFromMapIsOlder = entryFromMap != _newerEntryAlreadyBeenAndGone && entryFromMap.compareTo(addedEntry) < 0;
+
+ keepTryingToUpdateEntryReference = entryFromMapIsOlder
+ && !entryReferenceFromMap.compareAndSet(entryFromMap, addedEntry);
}
- while(oldEntry.compareTo(entry) < 0 && !latestValueReference.compareAndSet(oldEntry, entry));
+ while(keepTryingToUpdateEntryReference);
- if(oldEntry.compareTo(entry) < 0)
+ if (entryFromMap == _newerEntryAlreadyBeenAndGone)
+ {
+ discardEntry(addedEntry);
+ }
+ else if (entryFromMap.compareTo(addedEntry) > 0)
{
- // We replaced some other entry to become the newest value
- if(oldEntry.acquire())
+ if(LOGGER.isDebugEnabled())
{
- discardEntry(oldEntry);
+ LOGGER.debug("New entry " + addedEntry.getEntryId() + " for message " + addedEntry.getMessage().getMessageNumber() + " being immediately discarded because a newer entry arrived. The newer entry is: " + entryFromMap + " for message " + entryFromMap.getMessage().getMessageNumber());
}
+ discardEntry(addedEntry);
}
- else if (oldEntry.compareTo(entry) > 0)
+ else if (entryFromMap.compareTo(addedEntry) < 0)
{
- // A newer entry came along
- discardEntry(entry);
-
+ if(LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Entry " + addedEntry + " for message " + addedEntry.getMessage().getMessageNumber() + " replacing older entry " + entryFromMap + " for message " + entryFromMap.getMessage().getMessageNumber());
+ }
+ discardEntry(entryFromMap);
}
+
+ addedEntry.setLatestValueReference(entryReferenceFromMap);
}
- entry.setLatestValueReference(latestValueReference);
- return entry;
+ return addedEntry;
+ }
+
+ /**
+ * Returns:
+ *
+ * <ul>
+ * <li>the existing entry reference if the value already exists in the map, or</li>
+ * <li>referenceToValue if none exists, or</li>
+ * <li>a reference to {@link #_newerEntryAlreadyBeenAndGone} if another thread concurrently
+ * adds and removes during execution of this method.</li>
+ * </ul>
+ */
+ private AtomicReference<QueueEntry> getOrPutIfAbsent(final Object key, final AtomicReference<QueueEntry> referenceToAddedValue)
+ {
+ AtomicReference<QueueEntry> latestValueReference = _latestValuesMap.putIfAbsent(key, referenceToAddedValue);
+
+ if(latestValueReference == null)
+ {
+ latestValueReference = _latestValuesMap.get(key);
+ if(latestValueReference == null)
+ {
+ return new AtomicReference<QueueEntry>(_newerEntryAlreadyBeenAndGone);
+ }
+ }
+ return latestValueReference;
}
private void discardEntry(final QueueEntry entry)
@@ -104,11 +162,13 @@ public class ConflationQueueList extends SimpleQueueEntryList
txn.dequeue(entry.getQueue(),entry.getMessage(),
new ServerTransaction.Action()
{
+ @Override
public void postCommit()
{
entry.discard();
}
+ @Override
public void onRollback()
{
@@ -120,7 +180,6 @@ public class ConflationQueueList extends SimpleQueueEntryList
private final class ConflationQueueEntry extends SimpleQueueEntryImpl
{
-
private AtomicReference<QueueEntry> _latestValueReference;
public ConflationQueueEntry(SimpleQueueEntryList queueEntryList, ServerMessage message)
@@ -128,25 +187,56 @@ public class ConflationQueueList extends SimpleQueueEntryList
super(queueEntryList, message);
}
-
+ @Override
public void release()
{
super.release();
- if(_latestValueReference != null)
+ discardIfReleasedEntryIsNoLongerLatest();
+ }
+
+ @Override
+ public boolean delete()
+ {
+ if(super.delete())
{
- if(_latestValueReference.get() != this)
+ if(_latestValueReference != null && _latestValueReference.compareAndSet(this, _deleteInProgress))
{
- discardEntry(this);
+ Object key = getMessageHeader().getHeader(_conflationKey);
+ _latestValuesMap.remove(key,_latestValueReference);
}
+ return true;
+ }
+ else
+ {
+ return false;
}
-
}
public void setLatestValueReference(final AtomicReference<QueueEntry> latestValueReference)
{
_latestValueReference = latestValueReference;
}
+
+ private void discardIfReleasedEntryIsNoLongerLatest()
+ {
+ if(_latestValueReference != null)
+ {
+ if(_latestValueReference.get() != this)
+ {
+ discardEntry(this);
+ }
+ }
+ }
+
+ }
+
+ /**
+ * Exposed purposes of unit test only.
+ */
+ Map<Object, AtomicReference<QueueEntry>> getLatestValuesMap()
+ {
+ return Collections.unmodifiableMap(_latestValuesMap);
}
static class Factory implements QueueEntryListFactory
@@ -163,5 +253,4 @@ public class ConflationQueueList extends SimpleQueueEntryList
return new ConflationQueueList(queue, _conflationKey);
}
}
-
}