summaryrefslogtreecommitdiff
path: root/qpid
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2012-05-09 12:17:57 +0000
committerKeith Wall <kwall@apache.org>2012-05-09 12:17:57 +0000
commiteac63c52d9b8cd6f722bade2ab92861bfdd5f30d (patch)
treea0d75f5468d99399181fd09d1545a22774845345 /qpid
parentf982f86ceede56c2fa153e1cef21a31a75b5a669 (diff)
downloadqpid-python-eac63c52d9b8cd6f722bade2ab92861bfdd5f30d.tar.gz
QPID-3979: [Java Broker] Last value queue memory leak
The failure to remove entries from the _latestValuesMap caused leak of ConflationQueueList entries. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1336127 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java85
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/ConflationQueueListTest.java211
2 files changed, 275 insertions, 21 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java
index 75e6f2cfdc..d8467d2d8e 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java
@@ -25,6 +25,8 @@ import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
+import java.util.Collections;
+import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
@@ -52,29 +54,29 @@ public class ConflationQueueList extends SimpleQueueEntryList
return new ConflationQueueEntry(this, message);
}
-
@Override
public ConflationQueueEntry add(final ServerMessage message)
{
- ConflationQueueEntry entry = (ConflationQueueEntry) (super.add(message));
- AtomicReference<QueueEntry> latestValueReference = null;
+ final ConflationQueueEntry entry = (ConflationQueueEntry) (super.add(message));
- Object value = message.getMessageHeader().getHeader(_conflationKey);
- if(value != null)
+ final Object keyValue = message.getMessageHeader().getHeader(_conflationKey);
+ if (keyValue != null)
{
- latestValueReference = _latestValuesMap.get(value);
- if(latestValueReference == null)
- {
- _latestValuesMap.putIfAbsent(value, new AtomicReference<QueueEntry>(entry));
- latestValueReference = _latestValuesMap.get(value);
- }
+ final AtomicReference<QueueEntry> referenceToEntry = new AtomicReference<QueueEntry>(entry);
+ AtomicReference<QueueEntry> latestValueReference = null;
QueueEntry oldEntry;
+ // Iterate until we have got a valid atomic reference object and either the referent is newer than the current
+ // entry, or the current entry has replaced it in the reference. Note that the head represents a special value
+ // indicating that the reference object is no longer valid (it is being removed from the map).
do
{
+ latestValueReference = getOrPutIfAbsent(keyValue, referenceToEntry);
oldEntry = latestValueReference.get();
}
- while(oldEntry.compareTo(entry) < 0 && !latestValueReference.compareAndSet(oldEntry, entry));
+ while(oldEntry.compareTo(entry) < 0
+ && oldEntry != getHead()
+ && !latestValueReference.compareAndSet(oldEntry, entry));
if(oldEntry.compareTo(entry) < 0)
{
@@ -85,14 +87,24 @@ public class ConflationQueueList extends SimpleQueueEntryList
{
// A newer entry came along
discardEntry(entry);
-
}
+
+ entry.setLatestValueReference(latestValueReference);
}
- entry.setLatestValueReference(latestValueReference);
return entry;
}
+ private AtomicReference<QueueEntry> getOrPutIfAbsent(final Object key, final AtomicReference<QueueEntry> referenceToValue)
+ {
+ AtomicReference<QueueEntry> latestValueReference = _latestValuesMap.putIfAbsent(key, referenceToValue);
+ if(latestValueReference == null)
+ {
+ latestValueReference = _latestValuesMap.get(key);
+ }
+ return latestValueReference;
+ }
+
private void discardEntry(final QueueEntry entry)
{
if(entry.acquire())
@@ -101,11 +113,13 @@ public class ConflationQueueList extends SimpleQueueEntryList
txn.dequeue(entry.getQueue(),entry.getMessage(),
new ServerTransaction.Action()
{
+ @Override
public void postCommit()
{
entry.discard();
}
+ @Override
public void onRollback()
{
@@ -117,7 +131,6 @@ public class ConflationQueueList extends SimpleQueueEntryList
private final class ConflationQueueEntry extends SimpleQueueEntryImpl
{
-
private AtomicReference<QueueEntry> _latestValueReference;
public ConflationQueueEntry(SimpleQueueEntryList queueEntryList, ServerMessage message)
@@ -125,25 +138,56 @@ public class ConflationQueueList extends SimpleQueueEntryList
super(queueEntryList, message);
}
-
+ @Override
public void release()
{
super.release();
- if(_latestValueReference != null)
+ discardIfReleasedEntryIsNoLongerLatest();
+ }
+
+ @Override
+ public boolean delete()
+ {
+ if(super.delete())
{
- if(_latestValueReference.get() != this)
+ if(_latestValueReference != null && _latestValueReference.compareAndSet(this, getHead()))
{
- discardEntry(this);
+ Object key = getMessageHeader().getHeader(_conflationKey);
+ _latestValuesMap.remove(key,_latestValueReference);
}
+ return true;
+ }
+ else
+ {
+ return false;
}
-
}
public void setLatestValueReference(final AtomicReference<QueueEntry> latestValueReference)
{
_latestValueReference = latestValueReference;
}
+
+ private void discardIfReleasedEntryIsNoLongerLatest()
+ {
+ if(_latestValueReference != null)
+ {
+ if(_latestValueReference.get() != this)
+ {
+ discardEntry(this);
+ }
+ }
+ }
+
+ }
+
+ /**
+ * Exposed purposes of unit test only.
+ */
+ Map<Object, AtomicReference<QueueEntry>> getLatestValuesMap()
+ {
+ return Collections.unmodifiableMap(_latestValuesMap);
}
static class Factory implements QueueEntryListFactory
@@ -160,5 +204,4 @@ public class ConflationQueueList extends SimpleQueueEntryList
return new ConflationQueueList(queue, _conflationKey);
}
}
-
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/ConflationQueueListTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/ConflationQueueListTest.java
new file mode 100644
index 0000000000..a94548f1c3
--- /dev/null
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/ConflationQueueListTest.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import junit.framework.TestCase;
+
+import org.apache.qpid.server.message.AMQMessage;
+import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.message.AMQMessageReference;
+import org.apache.qpid.server.message.MessageMetaData;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+public class ConflationQueueListTest extends TestCase
+{
+ private static final String CONFLATION_KEY = "CONFLATION_KEY";
+
+ private static final String TEST_KEY_VALUE = "testKeyValue";
+ private static final String TEST_KEY_VALUE1 = "testKeyValue1";
+ private static final String TEST_KEY_VALUE2 = "testKeyValue2";
+
+ private ConflationQueueList _list;
+ private AMQQueue _queue = createTestQueue();
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ _list = new ConflationQueueList(_queue, CONFLATION_KEY);
+ }
+
+ public void testListHasNoEntries()
+ {
+ int numberOfEntries = countEntries(_list);
+ assertEquals(0, numberOfEntries);
+ }
+
+ public void testAddMessageWithoutConflationKeyValue()
+ {
+ ServerMessage<MessageMetaData> message = createTestServerMessage(null);
+
+ _list.add(message);
+ int numberOfEntries = countEntries(_list);
+ assertEquals(1, numberOfEntries);
+ }
+
+ public void testAddAndDiscardMessageWithoutConflationKeyValue()
+ {
+ ServerMessage<MessageMetaData> message = createTestServerMessage(null);
+
+ QueueEntry addedEntry = _list.add(message);
+ addedEntry.discard();
+
+ int numberOfEntries = countEntries(_list);
+ assertEquals(0, numberOfEntries);
+ }
+
+ public void testAddMessageWithConflationKeyValue()
+ {
+ ServerMessage<MessageMetaData> message = createTestServerMessage(TEST_KEY_VALUE);
+
+ _list.add(message);
+ int numberOfEntries = countEntries(_list);
+ assertEquals(1, numberOfEntries);
+ }
+
+ public void testAddAndRemoveMessageWithConflationKeyValue()
+ {
+ ServerMessage<MessageMetaData> message = createTestServerMessage(TEST_KEY_VALUE);
+
+ QueueEntry addedEntry = _list.add(message);
+ addedEntry.discard();
+
+ int numberOfEntries = countEntries(_list);
+ assertEquals(0, numberOfEntries);
+ }
+
+ public void testAddTwoMessagesWithDifferentConflationKeyValue()
+ {
+ ServerMessage<MessageMetaData> message1 = createTestServerMessage(TEST_KEY_VALUE1);
+ ServerMessage<MessageMetaData> message2 = createTestServerMessage(TEST_KEY_VALUE2);
+
+ _list.add(message1);
+ _list.add(message2);
+
+ int numberOfEntries = countEntries(_list);
+ assertEquals(2, numberOfEntries);
+ }
+
+ public void testAddTwoMessagesWithSameConflationKeyValue()
+ {
+ ServerMessage<MessageMetaData> message1 = createTestServerMessage(TEST_KEY_VALUE);
+ ServerMessage<MessageMetaData> message2 = createTestServerMessage(TEST_KEY_VALUE);
+
+ _list.add(message1);
+ _list.add(message2);
+
+ int numberOfEntries = countEntries(_list);
+ assertEquals(1, numberOfEntries);
+ }
+
+ public void testSupersededEntryIsDiscardedOnRelease()
+ {
+ ServerMessage<MessageMetaData> message1 = createTestServerMessage(TEST_KEY_VALUE);
+ ServerMessage<MessageMetaData> message2 = createTestServerMessage(TEST_KEY_VALUE);
+
+ QueueEntry entry1 = _list.add(message1);
+ entry1.acquire(); // simulate an in-progress delivery to consumer
+
+ _list.add(message2);
+ assertFalse(entry1.isDeleted());
+
+ assertEquals(2, countEntries(_list));
+
+ entry1.release(); // simulate consumer rollback/recover
+
+ assertEquals(1, countEntries(_list));
+ assertTrue(entry1.isDeleted());
+ }
+
+ public void testConflationMapMaintained()
+ {
+ assertEquals(0, _list.getLatestValuesMap().size());
+
+ ServerMessage<MessageMetaData> message = createTestServerMessage(TEST_KEY_VALUE);
+
+ QueueEntry addedEntry = _list.add(message);
+
+ assertEquals(1, countEntries(_list));
+ assertEquals(1, _list.getLatestValuesMap().size());
+
+ addedEntry.discard();
+
+ assertEquals(0, countEntries(_list));
+ assertEquals(0, _list.getLatestValuesMap().size());
+ }
+
+ public void testConflationMapMaintainedWithDifferentConflationKeyValue()
+ {
+
+ assertEquals(0, _list.getLatestValuesMap().size());
+
+ ServerMessage<MessageMetaData> message1 = createTestServerMessage(TEST_KEY_VALUE1);
+ ServerMessage<MessageMetaData> message2 = createTestServerMessage(TEST_KEY_VALUE2);
+
+ QueueEntry addedEntry1 = _list.add(message1);
+ QueueEntry addedEntry2 = _list.add(message2);
+
+ assertEquals(2, countEntries(_list));
+ assertEquals(2, _list.getLatestValuesMap().size());
+
+ addedEntry1.discard();
+ addedEntry2.discard();
+
+ assertEquals(0, countEntries(_list));
+ assertEquals(0, _list.getLatestValuesMap().size());
+ }
+
+ private int countEntries(ConflationQueueList list)
+ {
+ QueueEntryIterator<SimpleQueueEntryImpl> iterator = list.iterator();
+ int count = 0;
+ while(iterator.advance())
+ {
+ count++;
+ }
+ return count;
+ }
+
+ private ServerMessage<MessageMetaData> createTestServerMessage(String conflationKeyValue)
+ {
+ AMQMessage mockMessage = mock(AMQMessage.class);
+
+ AMQMessageHeader messageHeader = mock(AMQMessageHeader.class);
+ when(messageHeader.getHeader(CONFLATION_KEY)).thenReturn(conflationKeyValue);
+ when(mockMessage.getMessageHeader()).thenReturn(messageHeader);
+
+ AMQMessageReference messageReference = new AMQMessageReference(mockMessage);
+ when(mockMessage.newReference()).thenReturn(messageReference);
+
+ return mockMessage;
+ }
+
+ private AMQQueue createTestQueue()
+ {
+ AMQQueue queue = mock(AMQQueue.class);
+ VirtualHost virtualHost = mock(VirtualHost.class);
+ when(queue.getVirtualHost()).thenReturn(virtualHost);
+
+ return queue;
+ }
+}