diff options
author | Keith Wall <kwall@apache.org> | 2012-05-09 09:32:55 +0000 |
---|---|---|
committer | Keith Wall <kwall@apache.org> | 2012-05-09 09:32:55 +0000 |
commit | f982f86ceede56c2fa153e1cef21a31a75b5a669 (patch) | |
tree | 18205cbd998dfd2868483f43d1b2fbad9b2ec6db | |
parent | 55fe552069b300c041011e26323d38687ec52d12 (diff) | |
download | qpid-python-f982f86ceede56c2fa153e1cef21a31a75b5a669.tar.gz |
QPID-3980: Last Value Queue: Replaced conflation queue entries are never deleted (entries remained acquired permanently) causing leak.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1336026 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java | 5 | ||||
-rw-r--r-- | qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ConflationQueueTest.java | 113 |
2 files changed, 44 insertions, 74 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java index 6a2e4f155d..75e6f2cfdc 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java @@ -79,10 +79,7 @@ public class ConflationQueueList extends SimpleQueueEntryList if(oldEntry.compareTo(entry) < 0) { // We replaced some other entry to become the newest value - if(oldEntry.acquire()) - { - discardEntry(oldEntry); - } + discardEntry(oldEntry); } else if (oldEntry.compareTo(entry) > 0) { diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ConflationQueueTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ConflationQueueTest.java index ae7be6f7f4..7404f18aa3 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ConflationQueueTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ConflationQueueTest.java @@ -21,8 +21,7 @@ package org.apache.qpid.server.queue; -import org.apache.log4j.Logger; - +import org.apache.qpid.AMQException; import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQSession; @@ -44,45 +43,27 @@ import java.util.Map; public class ConflationQueueTest extends QpidBrokerTestCase { - private static final int TIMEOUT = 1500; - - - private static final Logger _logger = Logger.getLogger(ConflationQueueTest.class); - - - - protected final String VHOST = "/test"; - protected final String QUEUE = "ConflationQueue"; private static final int MSG_COUNT = 400; + private String queueName; private Connection producerConnection; private MessageProducer producer; private Session producerSession; private Queue queue; private Connection consumerConnection; private Session consumerSession; - - private MessageConsumer consumer; protected void setUp() throws Exception { super.setUp(); + queueName = getTestQueueName(); producerConnection = getConnection(); producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); producerConnection.start(); - - - } - - protected void tearDown() throws Exception - { - producerConnection.close(); - consumerConnection.close(); - super.tearDown(); } public void testConflation() throws Exception @@ -90,12 +71,7 @@ public class ConflationQueueTest extends QpidBrokerTestCase consumerConnection = getConnection(); consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - final Map<String,Object> arguments = new HashMap<String, Object>(); - arguments.put("qpid.last_value_queue_key","key"); - ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), false, true, false, arguments); - queue = new org.apache.qpid.client.AMQQueue("amq.direct",QUEUE); - ((AMQSession) producerSession).declareAndBind((AMQDestination)queue); + createConflationQueue(producerSession); producer = producerSession.createProducer(queue); for (int msg = 0; msg < MSG_COUNT; msg++) @@ -124,22 +100,15 @@ public class ConflationQueueTest extends QpidBrokerTestCase Message msg = messages.get(i); assertEquals("Unexpected message number received", MSG_COUNT - 10 + i, msg.getIntProperty("msg")); } - - } - public void testConflationWithRelease() throws Exception { consumerConnection = getConnection(); consumerSession = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE); - final Map<String,Object> arguments = new HashMap<String, Object>(); - arguments.put("qpid.last_value_queue_key","key"); - ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), false, true, false, arguments); - queue = new org.apache.qpid.client.AMQQueue("amq.direct",QUEUE); - ((AMQSession) producerSession).declareAndBind((AMQDestination)queue); + createConflationQueue(producerSession); producer = producerSession.createProducer(queue); for (int msg = 0; msg < MSG_COUNT/2; msg++) @@ -149,7 +118,7 @@ public class ConflationQueueTest extends QpidBrokerTestCase } // HACK to do something synchronous - ((AMQSession)producerSession).sync(); + ((AMQSession<?,?>)producerSession).sync(); consumer = consumerSession.createConsumer(queue); consumerConnection.start(); @@ -183,7 +152,7 @@ public class ConflationQueueTest extends QpidBrokerTestCase // HACK to do something synchronous - ((AMQSession)producerSession).sync(); + ((AMQSession<?,?>)producerSession).sync(); consumer = consumerSession.createConsumer(queue); consumerConnection.start(); @@ -205,18 +174,13 @@ public class ConflationQueueTest extends QpidBrokerTestCase } - public void testConflationWithReleaseAfterNewPublish() throws Exception { consumerConnection = getConnection(); consumerSession = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE); - final Map<String,Object> arguments = new HashMap<String, Object>(); - arguments.put("qpid.last_value_queue_key","key"); - ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), false, true, false, arguments); - queue = new org.apache.qpid.client.AMQQueue("amq.direct",QUEUE); - ((AMQSession) producerSession).declareAndBind((AMQDestination)queue); + createConflationQueue(producerSession); producer = producerSession.createProducer(queue); for (int msg = 0; msg < MSG_COUNT/2; msg++) @@ -225,7 +189,7 @@ public class ConflationQueueTest extends QpidBrokerTestCase } // HACK to do something synchronous - ((AMQSession)producerSession).sync(); + ((AMQSession<?,?>)producerSession).sync(); consumer = consumerSession.createConsumer(queue); consumerConnection.start(); @@ -252,7 +216,7 @@ public class ConflationQueueTest extends QpidBrokerTestCase } // HACK to do something synchronous - ((AMQSession)producerSession).sync(); + ((AMQSession<?,?>)producerSession).sync(); // this causes the "old" messages to be released @@ -284,17 +248,31 @@ public class ConflationQueueTest extends QpidBrokerTestCase } + public void testConflatedQueueDepth() throws Exception + { + consumerConnection = getConnection(); + consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + createConflationQueue(producerSession); + producer = producerSession.createProducer(queue); + + for (int msg = 0; msg < MSG_COUNT; msg++) + { + producer.send(nextMessage(msg, producerSession)); + } + + final long queueDepth = ((AMQSession<?, ?>)producerSession).getQueueDepth((AMQDestination)queue, true); + + assertEquals(10, queueDepth); + } + public void testConflationBrowser() throws Exception { consumerConnection = getConnection(); consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - final Map<String,Object> arguments = new HashMap<String, Object>(); - arguments.put("qpid.last_value_queue_key","key"); - ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), false, true, false, arguments); - queue = new org.apache.qpid.client.AMQQueue("amq.direct",QUEUE); - ((AMQSession) producerSession).declareAndBind((AMQDestination)queue); + createConflationQueue(producerSession); producer = producerSession.createProducer(queue); for (int msg = 0; msg < MSG_COUNT; msg++) @@ -303,9 +281,9 @@ public class ConflationQueueTest extends QpidBrokerTestCase } - ((AMQSession)producerSession).sync(); + ((AMQSession<?,?>)producerSession).sync(); - AMQBindingURL url = new AMQBindingURL("direct://amq.direct//"+QUEUE+"?browse='true'&durable='true'"); + AMQBindingURL url = new AMQBindingURL("direct://amq.direct//"+queueName+"?browse='true'&durable='true'"); AMQQueue browseQueue = new AMQQueue(url); consumer = consumerSession.createConsumer(browseQueue); @@ -329,7 +307,7 @@ public class ConflationQueueTest extends QpidBrokerTestCase producer.send(nextMessage(MSG_COUNT, producerSession)); - ((AMQSession)producerSession).sync(); + ((AMQSession<?,?>)producerSession).sync(); while((received = consumer.receive(1000))!=null) { @@ -354,22 +332,17 @@ public class ConflationQueueTest extends QpidBrokerTestCase consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - final Map<String,Object> arguments = new HashMap<String, Object>(); - arguments.put("qpid.last_value_queue_key","key"); - ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), false, true, false, arguments); - queue = new org.apache.qpid.client.AMQQueue("amq.direct",QUEUE); - ((AMQSession) producerSession).declareAndBind((AMQDestination)queue); + createConflationQueue(producerSession); producer = producerSession.createProducer(queue); for (int msg = 0; msg < MSG_COUNT; msg++) { producer.send(nextMessage(msg, producerSession)); - } - ((AMQSession)producerSession).sync(); + ((AMQSession<?,?>)producerSession).sync(); - AMQBindingURL url = new AMQBindingURL("direct://amq.direct//"+QUEUE+"?browse='true'&durable='true'"); + AMQBindingURL url = new AMQBindingURL("direct://amq.direct//"+queueName+"?browse='true'&durable='true'"); AMQQueue browseQueue = new AMQQueue(url); consumer = consumerSession.createConsumer(browseQueue); @@ -412,12 +385,16 @@ public class ConflationQueueTest extends QpidBrokerTestCase producer.close(); producerSession.close(); producerConnection.close(); - - - } - + private void createConflationQueue(Session session) throws AMQException + { + final Map<String,Object> arguments = new HashMap<String, Object>(); + arguments.put("qpid.last_value_queue_key","key"); + ((AMQSession<?,?>) session).createQueue(new AMQShortString(queueName), false, true, false, arguments); + queue = new AMQQueue("amq.direct", queueName); + ((AMQSession<?,?>) session).declareAndBind((AMQDestination)queue); + } private Message nextMessage(int msg, Session producerSession) throws JMSException { @@ -428,8 +405,4 @@ public class ConflationQueueTest extends QpidBrokerTestCase return send; } - - } - - |