diff options
Diffstat (limited to 'qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java')
-rw-r--r-- | qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java | 234 |
1 files changed, 93 insertions, 141 deletions
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java index 47f334adf6..ad8c856a74 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java @@ -20,38 +20,44 @@ */ package org.apache.qpid.server.queue; -import org.apache.log4j.Logger; -import org.apache.qpid.client.AMQSession; -import org.apache.qpid.client.AMQDestination; +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; + import org.apache.qpid.AMQException; +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.management.common.mbeans.ManagedQueue; import org.apache.qpid.server.logging.AbstractTestLogging; import org.apache.qpid.test.utils.JMXTestUtils; -import org.apache.qpid.framing.AMQShortString; - -import javax.jms.*; -import javax.naming.NamingException; -import java.util.HashMap; -import java.util.Map; -import java.util.List; -import java.util.concurrent.atomic.AtomicInteger; -import java.io.IOException; public class ProducerFlowControlTest extends AbstractTestLogging { private static final int TIMEOUT = 10000; - private static final Logger _logger = Logger.getLogger(ProducerFlowControlTest.class); - private Connection producerConnection; - private MessageProducer producer; - private Session producerSession; - private Queue queue; private Connection consumerConnection; + private Session producerSession; private Session consumerSession; - + private MessageProducer producer; private MessageConsumer consumer; - private final AtomicInteger _sentMessages = new AtomicInteger(); + private Queue queue; + + private final AtomicInteger _sentMessages = new AtomicInteger(0); private JMXTestUtils _jmxUtils; private boolean _jmxUtilConnected; @@ -77,37 +83,34 @@ public class ProducerFlowControlTest extends AbstractTestLogging public void tearDown() throws Exception { - if(_jmxUtilConnected) + try { - try + if(_jmxUtilConnected) { - _jmxUtils.close(); - } - catch (IOException e) - { - e.printStackTrace(); + try + { + _jmxUtils.close(); + } + catch (IOException e) + { + e.printStackTrace(); + } } + producerConnection.close(); + consumerConnection.close(); + } + finally + { + super.tearDown(); } - producerConnection.close(); - consumerConnection.close(); - super.tearDown(); } - public void testCapacityExceededCausesBlock() - throws JMSException, NamingException, AMQException, InterruptedException + public void testCapacityExceededCausesBlock() throws Exception { String queueName = getTestQueueName(); - - final Map<String,Object> arguments = new HashMap<String, Object>(); - arguments.put("x-qpid-capacity",1000); - arguments.put("x-qpid-flow-resume-capacity",800); - ((AMQSession) producerSession).createQueue(new AMQShortString(queueName), true, false, false, arguments); - queue = producerSession.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='false'&autodelete='true'"); - ((AMQSession) producerSession).declareAndBind((AMQDestination)queue); - producer = producerSession.createProducer(queue); - - _sentMessages.set(0); + createAndBindQueueWithFlowControlEnabled(producerSession, queueName, 1000, 800); + producer = producerSession.createProducer(queue); // try to send 5 messages (should block after 4) sendMessagesAsync(producer, producerSession, 5, 50L); @@ -135,22 +138,14 @@ public class ProducerFlowControlTest extends AbstractTestLogging } - public void testBrokerLogMessages() - throws JMSException, NamingException, AMQException, InterruptedException, IOException + + public void testBrokerLogMessages() throws Exception { String queueName = getTestQueueName(); - final Map<String,Object> arguments = new HashMap<String, Object>(); - arguments.put("x-qpid-capacity",1000); - arguments.put("x-qpid-flow-resume-capacity",800); - ((AMQSession) producerSession).createQueue(new AMQShortString(queueName), true, false, false, arguments); - queue = producerSession.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='false'&autodelete='true'"); - ((AMQSession) producerSession).declareAndBind((AMQDestination)queue); + createAndBindQueueWithFlowControlEnabled(producerSession, queueName, 1000, 800); producer = producerSession.createProducer(queue); - _sentMessages.set(0); - - // try to send 5 messages (should block after 4) sendMessagesAsync(producer, producerSession, 5, 50L); @@ -162,41 +157,28 @@ public class ProducerFlowControlTest extends AbstractTestLogging consumerConnection.start(); - while(consumer.receive(1000) != null); + while(consumer.receive(1000) != null) {}; results = waitAndFindMatches("QUE-1004"); assertEquals("Did not find correct number of UNDERFULL queue underfull messages", 1, results.size()); - - - } - public void testClientLogMessages() - throws JMSException, NamingException, AMQException, InterruptedException, IOException + public void testClientLogMessages() throws Exception { String queueName = getTestQueueName(); - + setTestClientSystemProperty("qpid.flow_control_wait_failure","3000"); setTestClientSystemProperty("qpid.flow_control_wait_notify_period","1000"); Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - final Map<String,Object> arguments = new HashMap<String, Object>(); - arguments.put("x-qpid-capacity",1000); - arguments.put("x-qpid-flow-resume-capacity",800); - ((AMQSession) session).createQueue(new AMQShortString(queueName), true, false, false, arguments); - queue = producerSession.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='false'&autodelete='true'"); - ((AMQSession) session).declareAndBind((AMQDestination)queue); + createAndBindQueueWithFlowControlEnabled(session, queueName, 1000, 800); producer = session.createProducer(queue); - _sentMessages.set(0); - - // try to send 5 messages (should block after 4) - MessageSender sender = sendMessagesAsync(producer, producerSession, 5, 50L); + MessageSender sender = sendMessagesAsync(producer, session, 5, 50L); List<String> results = waitAndFindMatches("Message send delayed by", TIMEOUT); assertTrue("No delay messages logged by client",results.size()!=0); @@ -205,26 +187,16 @@ public class ProducerFlowControlTest extends AbstractTestLogging + " flow control", TIMEOUT); assertEquals("Incorrect number of send failure messages logged by client (got " + results.size() + " delay " + "messages)",1,failedMessages.size()); - - - } - public void testFlowControlOnCapacityResumeEqual() - throws JMSException, NamingException, AMQException, InterruptedException + public void testFlowControlOnCapacityResumeEqual() throws Exception { String queueName = getTestQueueName(); - final Map<String,Object> arguments = new HashMap<String, Object>(); - arguments.put("x-qpid-capacity",1000); - arguments.put("x-qpid-flow-resume-capacity",1000); - ((AMQSession) producerSession).createQueue(new AMQShortString(queueName), true, false, false, arguments); - queue = producerSession.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='false'&autodelete='true'"); - ((AMQSession) producerSession).declareAndBind((AMQDestination)queue); + createAndBindQueueWithFlowControlEnabled(producerSession, queueName, 1000, 1000); producer = producerSession.createProducer(queue); - _sentMessages.set(0); // try to send 5 messages (should block after 4) sendMessagesAsync(producer, producerSession, 5, 50L); @@ -236,7 +208,6 @@ public class ProducerFlowControlTest extends AbstractTestLogging consumer = consumerSession.createConsumer(queue); consumerConnection.start(); - consumer.receive(); Thread.sleep(1000); @@ -247,23 +218,16 @@ public class ProducerFlowControlTest extends AbstractTestLogging } - public void testFlowControlSoak() - throws Exception, NamingException, AMQException, InterruptedException + public void testFlowControlSoak() throws Exception { String queueName = getTestQueueName(); - _sentMessages.set(0); + final int numProducers = 10; final int numMessages = 100; - final Map<String,Object> arguments = new HashMap<String, Object>(); - arguments.put("x-qpid-capacity",6000); - arguments.put("x-qpid-flow-resume-capacity",3000); - - ((AMQSession) consumerSession).createQueue(new AMQShortString(queueName), false, false, false, arguments); + createAndBindQueueWithFlowControlEnabled(producerSession, queueName, 6000, 3000); - queue = producerSession.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='false'&autodelete='false'"); - ((AMQSession) consumerSession).declareAndBind((AMQDestination)queue); consumerConnection.start(); Connection[] producers = new Connection[numProducers]; @@ -303,58 +267,38 @@ public class ProducerFlowControlTest extends AbstractTestLogging } - - - public void testSendTimeout() - throws JMSException, NamingException, AMQException, InterruptedException + public void testSendTimeout() throws Exception { String queueName = getTestQueueName(); - + final String expectedMsg = isBroker010() ? "Exception when sending message:timed out waiting for message credit" + : "Unable to send message for 3 seconds due to broker enforced flow control"; + setTestClientSystemProperty("qpid.flow_control_wait_failure","3000"); Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - final Map<String,Object> arguments = new HashMap<String, Object>(); - arguments.put("x-qpid-capacity",1000); - arguments.put("x-qpid-flow-resume-capacity",800); - ((AMQSession) session).createQueue(new AMQShortString(queueName), true, false, false, arguments); - queue = producerSession.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='false'&autodelete='true'"); - ((AMQSession) session).declareAndBind((AMQDestination)queue); + createAndBindQueueWithFlowControlEnabled(producerSession, queueName, 1000, 800); producer = session.createProducer(queue); - _sentMessages.set(0); - - // try to send 5 messages (should block after 4) - MessageSender sender = sendMessagesAsync(producer, producerSession, 5, 100L); + MessageSender sender = sendMessagesAsync(producer, session, 5, 100L); - - Thread.sleep(10000); - - Exception e = sender.getException(); + Exception e = sender.awaitSenderException(10000); assertNotNull("No timeout exception on sending", e); + + assertEquals("Unexpected exception reason", expectedMsg, e.getMessage()); + } - - - public void testFlowControlAttributeModificationViaJMX() - throws JMSException, NamingException, AMQException, InterruptedException, Exception + + public void testFlowControlAttributeModificationViaJMX() throws Exception { _jmxUtils.open(); _jmxUtilConnected = true; String queueName = getTestQueueName(); - - //create queue - final Map<String,Object> arguments = new HashMap<String, Object>(); - arguments.put("x-qpid-capacity",0); - arguments.put("x-qpid-flow-resume-capacity",0); - ((AMQSession) producerSession).createQueue(new AMQShortString(queueName), true, false, false, arguments); - - queue = producerSession.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='false'&autodelete='true'"); - ((AMQSession) producerSession).declareAndBind((AMQDestination)queue); + createAndBindQueueWithFlowControlEnabled(producerSession, queueName, 0, 0); producer = producerSession.createProducer(queue); Thread.sleep(1000); @@ -375,7 +319,7 @@ public class ProducerFlowControlTest extends AbstractTestLogging assertFalse("Queue should not be overfull", queueMBean.isFlowOverfull()); // try to send 2 messages (should block after 1) - _sentMessages.set(0); + sendMessagesAsync(producer, producerSession, 2, 50L); Thread.sleep(2000); @@ -406,13 +350,23 @@ public class ProducerFlowControlTest extends AbstractTestLogging consumer.receive(); //perform a synchronous op on the connection - ((AMQSession) consumerSession).sync(); + ((AMQSession<?,?>) consumerSession).sync(); assertFalse("Queue should not be overfull", queueMBean.isFlowOverfull()); consumer.receive(); } + private void createAndBindQueueWithFlowControlEnabled(Session session, String queueName, int capacity, int resumeCapacity) throws Exception + { + final Map<String,Object> arguments = new HashMap<String, Object>(); + arguments.put("x-qpid-capacity",capacity); + arguments.put("x-qpid-flow-resume-capacity",resumeCapacity); + ((AMQSession<?,?>) session).createQueue(new AMQShortString(queueName), true, false, false, arguments); + queue = session.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='false'&autodelete='true'"); + ((AMQSession<?,?>) session).declareAndBind((AMQDestination)queue); + } + private MessageSender sendMessagesAsync(final MessageProducer producer, final Session producerSession, final int numMessages, @@ -435,7 +389,7 @@ public class ProducerFlowControlTest extends AbstractTestLogging try { - ((AMQSession)producerSession).sync(); + ((AMQSession<?,?>)producerSession).sync(); } catch (AMQException e) { @@ -456,7 +410,6 @@ public class ProducerFlowControlTest extends AbstractTestLogging private static final byte[] BYTE_300 = new byte[300]; - private Message nextMessage(int msg, Session producerSession) throws JMSException { BytesMessage send = producerSession.createBytesMessage(); @@ -466,22 +419,19 @@ public class ProducerFlowControlTest extends AbstractTestLogging return send; } - private class MessageSender implements Runnable { - private final MessageProducer _producer; - private final Session _producerSession; + private final MessageProducer _senderProducer; + private final Session _senderSession; private final int _numMessages; - - - - private JMSException _exception; + private volatile JMSException _exception; + private CountDownLatch _exceptionThrownLatch = new CountDownLatch(1); private long _sleepPeriod; public MessageSender(MessageProducer producer, Session producerSession, int numMessages, long sleepPeriod) { - _producer = producer; - _producerSession = producerSession; + _senderProducer = producer; + _senderSession = producerSession; _numMessages = numMessages; _sleepPeriod = sleepPeriod; } @@ -490,16 +440,18 @@ public class ProducerFlowControlTest extends AbstractTestLogging { try { - sendMessages(_producer, _producerSession, _numMessages, _sleepPeriod); + sendMessages(_senderProducer, _senderSession, _numMessages, _sleepPeriod); } catch (JMSException e) { _exception = e; + _exceptionThrownLatch.countDown(); } } - public JMSException getException() + public Exception awaitSenderException(long timeout) throws InterruptedException { + _exceptionThrownLatch.await(timeout, TimeUnit.MILLISECONDS); return _exception; } } |