summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2012-05-09 09:32:55 +0000
committerKeith Wall <kwall@apache.org>2012-05-09 09:32:55 +0000
commitf982f86ceede56c2fa153e1cef21a31a75b5a669 (patch)
tree18205cbd998dfd2868483f43d1b2fbad9b2ec6db
parent55fe552069b300c041011e26323d38687ec52d12 (diff)
downloadqpid-python-f982f86ceede56c2fa153e1cef21a31a75b5a669.tar.gz
QPID-3980: Last Value Queue: Replaced conflation queue entries are never deleted (entries remained acquired permanently) causing leak.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1336026 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java5
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ConflationQueueTest.java113
2 files changed, 44 insertions, 74 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java
index 6a2e4f155d..75e6f2cfdc 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java
@@ -79,10 +79,7 @@ public class ConflationQueueList extends SimpleQueueEntryList
if(oldEntry.compareTo(entry) < 0)
{
// We replaced some other entry to become the newest value
- if(oldEntry.acquire())
- {
- discardEntry(oldEntry);
- }
+ discardEntry(oldEntry);
}
else if (oldEntry.compareTo(entry) > 0)
{
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ConflationQueueTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ConflationQueueTest.java
index ae7be6f7f4..7404f18aa3 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ConflationQueueTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ConflationQueueTest.java
@@ -21,8 +21,7 @@
package org.apache.qpid.server.queue;
-import org.apache.log4j.Logger;
-
+import org.apache.qpid.AMQException;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQSession;
@@ -44,45 +43,27 @@ import java.util.Map;
public class ConflationQueueTest extends QpidBrokerTestCase
{
- private static final int TIMEOUT = 1500;
-
-
- private static final Logger _logger = Logger.getLogger(ConflationQueueTest.class);
-
-
-
- protected final String VHOST = "/test";
- protected final String QUEUE = "ConflationQueue";
private static final int MSG_COUNT = 400;
+ private String queueName;
private Connection producerConnection;
private MessageProducer producer;
private Session producerSession;
private Queue queue;
private Connection consumerConnection;
private Session consumerSession;
-
-
private MessageConsumer consumer;
protected void setUp() throws Exception
{
super.setUp();
+ queueName = getTestQueueName();
producerConnection = getConnection();
producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
producerConnection.start();
-
-
- }
-
- protected void tearDown() throws Exception
- {
- producerConnection.close();
- consumerConnection.close();
- super.tearDown();
}
public void testConflation() throws Exception
@@ -90,12 +71,7 @@ public class ConflationQueueTest extends QpidBrokerTestCase
consumerConnection = getConnection();
consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- final Map<String,Object> arguments = new HashMap<String, Object>();
- arguments.put("qpid.last_value_queue_key","key");
- ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), false, true, false, arguments);
- queue = new org.apache.qpid.client.AMQQueue("amq.direct",QUEUE);
- ((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
+ createConflationQueue(producerSession);
producer = producerSession.createProducer(queue);
for (int msg = 0; msg < MSG_COUNT; msg++)
@@ -124,22 +100,15 @@ public class ConflationQueueTest extends QpidBrokerTestCase
Message msg = messages.get(i);
assertEquals("Unexpected message number received", MSG_COUNT - 10 + i, msg.getIntProperty("msg"));
}
-
-
}
-
public void testConflationWithRelease() throws Exception
{
consumerConnection = getConnection();
consumerSession = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- final Map<String,Object> arguments = new HashMap<String, Object>();
- arguments.put("qpid.last_value_queue_key","key");
- ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), false, true, false, arguments);
- queue = new org.apache.qpid.client.AMQQueue("amq.direct",QUEUE);
- ((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
+ createConflationQueue(producerSession);
producer = producerSession.createProducer(queue);
for (int msg = 0; msg < MSG_COUNT/2; msg++)
@@ -149,7 +118,7 @@ public class ConflationQueueTest extends QpidBrokerTestCase
}
// HACK to do something synchronous
- ((AMQSession)producerSession).sync();
+ ((AMQSession<?,?>)producerSession).sync();
consumer = consumerSession.createConsumer(queue);
consumerConnection.start();
@@ -183,7 +152,7 @@ public class ConflationQueueTest extends QpidBrokerTestCase
// HACK to do something synchronous
- ((AMQSession)producerSession).sync();
+ ((AMQSession<?,?>)producerSession).sync();
consumer = consumerSession.createConsumer(queue);
consumerConnection.start();
@@ -205,18 +174,13 @@ public class ConflationQueueTest extends QpidBrokerTestCase
}
-
public void testConflationWithReleaseAfterNewPublish() throws Exception
{
consumerConnection = getConnection();
consumerSession = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- final Map<String,Object> arguments = new HashMap<String, Object>();
- arguments.put("qpid.last_value_queue_key","key");
- ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), false, true, false, arguments);
- queue = new org.apache.qpid.client.AMQQueue("amq.direct",QUEUE);
- ((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
+ createConflationQueue(producerSession);
producer = producerSession.createProducer(queue);
for (int msg = 0; msg < MSG_COUNT/2; msg++)
@@ -225,7 +189,7 @@ public class ConflationQueueTest extends QpidBrokerTestCase
}
// HACK to do something synchronous
- ((AMQSession)producerSession).sync();
+ ((AMQSession<?,?>)producerSession).sync();
consumer = consumerSession.createConsumer(queue);
consumerConnection.start();
@@ -252,7 +216,7 @@ public class ConflationQueueTest extends QpidBrokerTestCase
}
// HACK to do something synchronous
- ((AMQSession)producerSession).sync();
+ ((AMQSession<?,?>)producerSession).sync();
// this causes the "old" messages to be released
@@ -284,17 +248,31 @@ public class ConflationQueueTest extends QpidBrokerTestCase
}
+ public void testConflatedQueueDepth() throws Exception
+ {
+ consumerConnection = getConnection();
+ consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ createConflationQueue(producerSession);
+ producer = producerSession.createProducer(queue);
+
+ for (int msg = 0; msg < MSG_COUNT; msg++)
+ {
+ producer.send(nextMessage(msg, producerSession));
+ }
+
+ final long queueDepth = ((AMQSession<?, ?>)producerSession).getQueueDepth((AMQDestination)queue, true);
+
+ assertEquals(10, queueDepth);
+ }
+
public void testConflationBrowser() throws Exception
{
consumerConnection = getConnection();
consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- final Map<String,Object> arguments = new HashMap<String, Object>();
- arguments.put("qpid.last_value_queue_key","key");
- ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), false, true, false, arguments);
- queue = new org.apache.qpid.client.AMQQueue("amq.direct",QUEUE);
- ((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
+ createConflationQueue(producerSession);
producer = producerSession.createProducer(queue);
for (int msg = 0; msg < MSG_COUNT; msg++)
@@ -303,9 +281,9 @@ public class ConflationQueueTest extends QpidBrokerTestCase
}
- ((AMQSession)producerSession).sync();
+ ((AMQSession<?,?>)producerSession).sync();
- AMQBindingURL url = new AMQBindingURL("direct://amq.direct//"+QUEUE+"?browse='true'&durable='true'");
+ AMQBindingURL url = new AMQBindingURL("direct://amq.direct//"+queueName+"?browse='true'&durable='true'");
AMQQueue browseQueue = new AMQQueue(url);
consumer = consumerSession.createConsumer(browseQueue);
@@ -329,7 +307,7 @@ public class ConflationQueueTest extends QpidBrokerTestCase
producer.send(nextMessage(MSG_COUNT, producerSession));
- ((AMQSession)producerSession).sync();
+ ((AMQSession<?,?>)producerSession).sync();
while((received = consumer.receive(1000))!=null)
{
@@ -354,22 +332,17 @@ public class ConflationQueueTest extends QpidBrokerTestCase
consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- final Map<String,Object> arguments = new HashMap<String, Object>();
- arguments.put("qpid.last_value_queue_key","key");
- ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), false, true, false, arguments);
- queue = new org.apache.qpid.client.AMQQueue("amq.direct",QUEUE);
- ((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
+ createConflationQueue(producerSession);
producer = producerSession.createProducer(queue);
for (int msg = 0; msg < MSG_COUNT; msg++)
{
producer.send(nextMessage(msg, producerSession));
-
}
- ((AMQSession)producerSession).sync();
+ ((AMQSession<?,?>)producerSession).sync();
- AMQBindingURL url = new AMQBindingURL("direct://amq.direct//"+QUEUE+"?browse='true'&durable='true'");
+ AMQBindingURL url = new AMQBindingURL("direct://amq.direct//"+queueName+"?browse='true'&durable='true'");
AMQQueue browseQueue = new AMQQueue(url);
consumer = consumerSession.createConsumer(browseQueue);
@@ -412,12 +385,16 @@ public class ConflationQueueTest extends QpidBrokerTestCase
producer.close();
producerSession.close();
producerConnection.close();
-
-
-
}
-
+ private void createConflationQueue(Session session) throws AMQException
+ {
+ final Map<String,Object> arguments = new HashMap<String, Object>();
+ arguments.put("qpid.last_value_queue_key","key");
+ ((AMQSession<?,?>) session).createQueue(new AMQShortString(queueName), false, true, false, arguments);
+ queue = new AMQQueue("amq.direct", queueName);
+ ((AMQSession<?,?>) session).declareAndBind((AMQDestination)queue);
+ }
private Message nextMessage(int msg, Session producerSession) throws JMSException
{
@@ -428,8 +405,4 @@ public class ConflationQueueTest extends QpidBrokerTestCase
return send;
}
-
-
}
-
-