summaryrefslogtreecommitdiff
path: root/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java')
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java234
1 files changed, 93 insertions, 141 deletions
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java
index 47f334adf6..ad8c856a74 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java
@@ -20,38 +20,44 @@
*/
package org.apache.qpid.server.queue;
-import org.apache.log4j.Logger;
-import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.client.AMQDestination;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.management.common.mbeans.ManagedQueue;
import org.apache.qpid.server.logging.AbstractTestLogging;
import org.apache.qpid.test.utils.JMXTestUtils;
-import org.apache.qpid.framing.AMQShortString;
-
-import javax.jms.*;
-import javax.naming.NamingException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.io.IOException;
public class ProducerFlowControlTest extends AbstractTestLogging
{
private static final int TIMEOUT = 10000;
- private static final Logger _logger = Logger.getLogger(ProducerFlowControlTest.class);
-
private Connection producerConnection;
- private MessageProducer producer;
- private Session producerSession;
- private Queue queue;
private Connection consumerConnection;
+ private Session producerSession;
private Session consumerSession;
-
+ private MessageProducer producer;
private MessageConsumer consumer;
- private final AtomicInteger _sentMessages = new AtomicInteger();
+ private Queue queue;
+
+ private final AtomicInteger _sentMessages = new AtomicInteger(0);
private JMXTestUtils _jmxUtils;
private boolean _jmxUtilConnected;
@@ -77,37 +83,34 @@ public class ProducerFlowControlTest extends AbstractTestLogging
public void tearDown() throws Exception
{
- if(_jmxUtilConnected)
+ try
{
- try
+ if(_jmxUtilConnected)
{
- _jmxUtils.close();
- }
- catch (IOException e)
- {
- e.printStackTrace();
+ try
+ {
+ _jmxUtils.close();
+ }
+ catch (IOException e)
+ {
+ e.printStackTrace();
+ }
}
+ producerConnection.close();
+ consumerConnection.close();
+ }
+ finally
+ {
+ super.tearDown();
}
- producerConnection.close();
- consumerConnection.close();
- super.tearDown();
}
- public void testCapacityExceededCausesBlock()
- throws JMSException, NamingException, AMQException, InterruptedException
+ public void testCapacityExceededCausesBlock() throws Exception
{
String queueName = getTestQueueName();
-
- final Map<String,Object> arguments = new HashMap<String, Object>();
- arguments.put("x-qpid-capacity",1000);
- arguments.put("x-qpid-flow-resume-capacity",800);
- ((AMQSession) producerSession).createQueue(new AMQShortString(queueName), true, false, false, arguments);
- queue = producerSession.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='false'&autodelete='true'");
- ((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
- producer = producerSession.createProducer(queue);
-
- _sentMessages.set(0);
+ createAndBindQueueWithFlowControlEnabled(producerSession, queueName, 1000, 800);
+ producer = producerSession.createProducer(queue);
// try to send 5 messages (should block after 4)
sendMessagesAsync(producer, producerSession, 5, 50L);
@@ -135,22 +138,14 @@ public class ProducerFlowControlTest extends AbstractTestLogging
}
- public void testBrokerLogMessages()
- throws JMSException, NamingException, AMQException, InterruptedException, IOException
+
+ public void testBrokerLogMessages() throws Exception
{
String queueName = getTestQueueName();
- final Map<String,Object> arguments = new HashMap<String, Object>();
- arguments.put("x-qpid-capacity",1000);
- arguments.put("x-qpid-flow-resume-capacity",800);
- ((AMQSession) producerSession).createQueue(new AMQShortString(queueName), true, false, false, arguments);
- queue = producerSession.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='false'&autodelete='true'");
- ((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
+ createAndBindQueueWithFlowControlEnabled(producerSession, queueName, 1000, 800);
producer = producerSession.createProducer(queue);
- _sentMessages.set(0);
-
-
// try to send 5 messages (should block after 4)
sendMessagesAsync(producer, producerSession, 5, 50L);
@@ -162,41 +157,28 @@ public class ProducerFlowControlTest extends AbstractTestLogging
consumerConnection.start();
- while(consumer.receive(1000) != null);
+ while(consumer.receive(1000) != null) {};
results = waitAndFindMatches("QUE-1004");
assertEquals("Did not find correct number of UNDERFULL queue underfull messages", 1, results.size());
-
-
-
}
- public void testClientLogMessages()
- throws JMSException, NamingException, AMQException, InterruptedException, IOException
+ public void testClientLogMessages() throws Exception
{
String queueName = getTestQueueName();
-
+
setTestClientSystemProperty("qpid.flow_control_wait_failure","3000");
setTestClientSystemProperty("qpid.flow_control_wait_notify_period","1000");
Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- final Map<String,Object> arguments = new HashMap<String, Object>();
- arguments.put("x-qpid-capacity",1000);
- arguments.put("x-qpid-flow-resume-capacity",800);
- ((AMQSession) session).createQueue(new AMQShortString(queueName), true, false, false, arguments);
- queue = producerSession.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='false'&autodelete='true'");
- ((AMQSession) session).declareAndBind((AMQDestination)queue);
+ createAndBindQueueWithFlowControlEnabled(session, queueName, 1000, 800);
producer = session.createProducer(queue);
- _sentMessages.set(0);
-
-
// try to send 5 messages (should block after 4)
- MessageSender sender = sendMessagesAsync(producer, producerSession, 5, 50L);
+ MessageSender sender = sendMessagesAsync(producer, session, 5, 50L);
List<String> results = waitAndFindMatches("Message send delayed by", TIMEOUT);
assertTrue("No delay messages logged by client",results.size()!=0);
@@ -205,26 +187,16 @@ public class ProducerFlowControlTest extends AbstractTestLogging
+ " flow control", TIMEOUT);
assertEquals("Incorrect number of send failure messages logged by client (got " + results.size() + " delay "
+ "messages)",1,failedMessages.size());
-
-
-
}
- public void testFlowControlOnCapacityResumeEqual()
- throws JMSException, NamingException, AMQException, InterruptedException
+ public void testFlowControlOnCapacityResumeEqual() throws Exception
{
String queueName = getTestQueueName();
- final Map<String,Object> arguments = new HashMap<String, Object>();
- arguments.put("x-qpid-capacity",1000);
- arguments.put("x-qpid-flow-resume-capacity",1000);
- ((AMQSession) producerSession).createQueue(new AMQShortString(queueName), true, false, false, arguments);
- queue = producerSession.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='false'&autodelete='true'");
- ((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
+ createAndBindQueueWithFlowControlEnabled(producerSession, queueName, 1000, 1000);
producer = producerSession.createProducer(queue);
- _sentMessages.set(0);
// try to send 5 messages (should block after 4)
sendMessagesAsync(producer, producerSession, 5, 50L);
@@ -236,7 +208,6 @@ public class ProducerFlowControlTest extends AbstractTestLogging
consumer = consumerSession.createConsumer(queue);
consumerConnection.start();
-
consumer.receive();
Thread.sleep(1000);
@@ -247,23 +218,16 @@ public class ProducerFlowControlTest extends AbstractTestLogging
}
- public void testFlowControlSoak()
- throws Exception, NamingException, AMQException, InterruptedException
+ public void testFlowControlSoak() throws Exception
{
String queueName = getTestQueueName();
- _sentMessages.set(0);
+
final int numProducers = 10;
final int numMessages = 100;
- final Map<String,Object> arguments = new HashMap<String, Object>();
- arguments.put("x-qpid-capacity",6000);
- arguments.put("x-qpid-flow-resume-capacity",3000);
-
- ((AMQSession) consumerSession).createQueue(new AMQShortString(queueName), false, false, false, arguments);
+ createAndBindQueueWithFlowControlEnabled(producerSession, queueName, 6000, 3000);
- queue = producerSession.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='false'&autodelete='false'");
- ((AMQSession) consumerSession).declareAndBind((AMQDestination)queue);
consumerConnection.start();
Connection[] producers = new Connection[numProducers];
@@ -303,58 +267,38 @@ public class ProducerFlowControlTest extends AbstractTestLogging
}
-
-
- public void testSendTimeout()
- throws JMSException, NamingException, AMQException, InterruptedException
+ public void testSendTimeout() throws Exception
{
String queueName = getTestQueueName();
-
+ final String expectedMsg = isBroker010() ? "Exception when sending message:timed out waiting for message credit"
+ : "Unable to send message for 3 seconds due to broker enforced flow control";
+
setTestClientSystemProperty("qpid.flow_control_wait_failure","3000");
Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- final Map<String,Object> arguments = new HashMap<String, Object>();
- arguments.put("x-qpid-capacity",1000);
- arguments.put("x-qpid-flow-resume-capacity",800);
- ((AMQSession) session).createQueue(new AMQShortString(queueName), true, false, false, arguments);
- queue = producerSession.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='false'&autodelete='true'");
- ((AMQSession) session).declareAndBind((AMQDestination)queue);
+ createAndBindQueueWithFlowControlEnabled(producerSession, queueName, 1000, 800);
producer = session.createProducer(queue);
- _sentMessages.set(0);
-
-
// try to send 5 messages (should block after 4)
- MessageSender sender = sendMessagesAsync(producer, producerSession, 5, 100L);
+ MessageSender sender = sendMessagesAsync(producer, session, 5, 100L);
-
- Thread.sleep(10000);
-
- Exception e = sender.getException();
+ Exception e = sender.awaitSenderException(10000);
assertNotNull("No timeout exception on sending", e);
+
+ assertEquals("Unexpected exception reason", expectedMsg, e.getMessage());
+
}
-
-
- public void testFlowControlAttributeModificationViaJMX()
- throws JMSException, NamingException, AMQException, InterruptedException, Exception
+
+ public void testFlowControlAttributeModificationViaJMX() throws Exception
{
_jmxUtils.open();
_jmxUtilConnected = true;
String queueName = getTestQueueName();
-
- //create queue
- final Map<String,Object> arguments = new HashMap<String, Object>();
- arguments.put("x-qpid-capacity",0);
- arguments.put("x-qpid-flow-resume-capacity",0);
- ((AMQSession) producerSession).createQueue(new AMQShortString(queueName), true, false, false, arguments);
-
- queue = producerSession.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='false'&autodelete='true'");
- ((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
+ createAndBindQueueWithFlowControlEnabled(producerSession, queueName, 0, 0);
producer = producerSession.createProducer(queue);
Thread.sleep(1000);
@@ -375,7 +319,7 @@ public class ProducerFlowControlTest extends AbstractTestLogging
assertFalse("Queue should not be overfull", queueMBean.isFlowOverfull());
// try to send 2 messages (should block after 1)
- _sentMessages.set(0);
+
sendMessagesAsync(producer, producerSession, 2, 50L);
Thread.sleep(2000);
@@ -406,13 +350,23 @@ public class ProducerFlowControlTest extends AbstractTestLogging
consumer.receive();
//perform a synchronous op on the connection
- ((AMQSession) consumerSession).sync();
+ ((AMQSession<?,?>) consumerSession).sync();
assertFalse("Queue should not be overfull", queueMBean.isFlowOverfull());
consumer.receive();
}
+ private void createAndBindQueueWithFlowControlEnabled(Session session, String queueName, int capacity, int resumeCapacity) throws Exception
+ {
+ final Map<String,Object> arguments = new HashMap<String, Object>();
+ arguments.put("x-qpid-capacity",capacity);
+ arguments.put("x-qpid-flow-resume-capacity",resumeCapacity);
+ ((AMQSession<?,?>) session).createQueue(new AMQShortString(queueName), true, false, false, arguments);
+ queue = session.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='false'&autodelete='true'");
+ ((AMQSession<?,?>) session).declareAndBind((AMQDestination)queue);
+ }
+
private MessageSender sendMessagesAsync(final MessageProducer producer,
final Session producerSession,
final int numMessages,
@@ -435,7 +389,7 @@ public class ProducerFlowControlTest extends AbstractTestLogging
try
{
- ((AMQSession)producerSession).sync();
+ ((AMQSession<?,?>)producerSession).sync();
}
catch (AMQException e)
{
@@ -456,7 +410,6 @@ public class ProducerFlowControlTest extends AbstractTestLogging
private static final byte[] BYTE_300 = new byte[300];
-
private Message nextMessage(int msg, Session producerSession) throws JMSException
{
BytesMessage send = producerSession.createBytesMessage();
@@ -466,22 +419,19 @@ public class ProducerFlowControlTest extends AbstractTestLogging
return send;
}
-
private class MessageSender implements Runnable
{
- private final MessageProducer _producer;
- private final Session _producerSession;
+ private final MessageProducer _senderProducer;
+ private final Session _senderSession;
private final int _numMessages;
-
-
-
- private JMSException _exception;
+ private volatile JMSException _exception;
+ private CountDownLatch _exceptionThrownLatch = new CountDownLatch(1);
private long _sleepPeriod;
public MessageSender(MessageProducer producer, Session producerSession, int numMessages, long sleepPeriod)
{
- _producer = producer;
- _producerSession = producerSession;
+ _senderProducer = producer;
+ _senderSession = producerSession;
_numMessages = numMessages;
_sleepPeriod = sleepPeriod;
}
@@ -490,16 +440,18 @@ public class ProducerFlowControlTest extends AbstractTestLogging
{
try
{
- sendMessages(_producer, _producerSession, _numMessages, _sleepPeriod);
+ sendMessages(_senderProducer, _senderSession, _numMessages, _sleepPeriod);
}
catch (JMSException e)
{
_exception = e;
+ _exceptionThrownLatch.countDown();
}
}
- public JMSException getException()
+ public Exception awaitSenderException(long timeout) throws InterruptedException
{
+ _exceptionThrownLatch.await(timeout, TimeUnit.MILLISECONDS);
return _exception;
}
}