summaryrefslogtreecommitdiff
path: root/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java')
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java36
1 files changed, 34 insertions, 2 deletions
diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java
index ad8c856a74..13053d02df 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java
@@ -357,13 +357,45 @@ public class ProducerFlowControlTest extends AbstractTestLogging
consumer.receive();
}
+ public void testQueueDeleteWithBlockedFlow() throws Exception
+ {
+ String queueName = getTestQueueName();
+ createAndBindQueueWithFlowControlEnabled(producerSession, queueName, 1000, 800, true, false);
+
+ producer = producerSession.createProducer(queue);
+
+ // try to send 5 messages (should block after 4)
+ sendMessagesAsync(producer, producerSession, 5, 50L);
+
+ Thread.sleep(5000);
+
+ assertEquals("Incorrect number of message sent before blocking", 4, _sentMessages.get());
+
+ // close blocked producer session and connection
+ producerConnection.close();
+
+ // delete queue with a consumer session
+ ((AMQSession<?,?>) consumerSession).sendQueueDelete(new AMQShortString(queueName));
+
+ consumer = consumerSession.createConsumer(queue);
+ consumerConnection.start();
+
+ Message message = consumer.receive(1000l);
+ assertNull("Unexpected message", message);
+ }
+
private void createAndBindQueueWithFlowControlEnabled(Session session, String queueName, int capacity, int resumeCapacity) throws Exception
{
+ createAndBindQueueWithFlowControlEnabled(session, queueName, capacity, resumeCapacity, false, true);
+ }
+
+ private void createAndBindQueueWithFlowControlEnabled(Session session, String queueName, int capacity, int resumeCapacity, boolean durable, boolean autoDelete) throws Exception
+ {
final Map<String,Object> arguments = new HashMap<String, Object>();
arguments.put("x-qpid-capacity",capacity);
arguments.put("x-qpid-flow-resume-capacity",resumeCapacity);
- ((AMQSession<?,?>) session).createQueue(new AMQShortString(queueName), true, false, false, arguments);
- queue = session.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='false'&autodelete='true'");
+ ((AMQSession<?,?>) session).createQueue(new AMQShortString(queueName), autoDelete, durable, false, arguments);
+ queue = session.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='" + durable + "'&autodelete='" + autoDelete + "'");
((AMQSession<?,?>) session).declareAndBind((AMQDestination)queue);
}