diff options
Diffstat (limited to 'qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java')
-rw-r--r-- | qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java | 496 |
1 files changed, 496 insertions, 0 deletions
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java new file mode 100644 index 0000000000..f78b327209 --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java @@ -0,0 +1,496 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +package org.apache.qpid.server.queue; + +import org.apache.log4j.Logger; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.AMQException; +import org.apache.qpid.management.common.mbeans.ManagedQueue; +import org.apache.qpid.server.logging.AbstractTestLogging; +import org.apache.qpid.test.utils.JMXTestUtils; +import org.apache.qpid.framing.AMQShortString; + +import javax.jms.*; +import javax.naming.NamingException; +import java.util.HashMap; +import java.util.Map; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import java.io.IOException; + +public class ProducerFlowControlTest extends AbstractTestLogging +{ + private static final int TIMEOUT = 10000; + + private static final Logger _logger = Logger.getLogger(ProducerFlowControlTest.class); + + private Connection producerConnection; + private MessageProducer producer; + private Session producerSession; + private Queue queue; + private Connection consumerConnection; + private Session consumerSession; + + private MessageConsumer consumer; + private final AtomicInteger _sentMessages = new AtomicInteger(); + + private JMXTestUtils _jmxUtils; + private boolean _jmxUtilConnected; + private static final String USER = "admin"; + + public void setUp() throws Exception + { + _jmxUtils = new JMXTestUtils(this, USER , USER); + _jmxUtils.setUp(); + _jmxUtilConnected=false; + super.setUp(); + + _monitor.reset(); + + producerConnection = getConnection(); + producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + producerConnection.start(); + + consumerConnection = getConnection(); + consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + } + + public void tearDown() throws Exception + { + if(_jmxUtilConnected) + { + try + { + _jmxUtils.close(); + } + catch (IOException e) + { + e.printStackTrace(); + } + } + producerConnection.close(); + consumerConnection.close(); + super.tearDown(); + } + + public void testCapacityExceededCausesBlock() + throws JMSException, NamingException, AMQException, InterruptedException + { + String queueName = getTestQueueName(); + + final Map<String,Object> arguments = new HashMap<String, Object>(); + arguments.put("x-qpid-capacity",1000); + arguments.put("x-qpid-flow-resume-capacity",800); + ((AMQSession) producerSession).createQueue(new AMQShortString(queueName), true, false, false, arguments); + queue = producerSession.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='false'&autodelete='true'"); + ((AMQSession) producerSession).declareAndBind((AMQDestination)queue); + producer = producerSession.createProducer(queue); + + _sentMessages.set(0); + + + // try to send 5 messages (should block after 4) + sendMessagesAsync(producer, producerSession, 5, 50L); + + Thread.sleep(5000); + + assertEquals("Incorrect number of message sent before blocking", 4, _sentMessages.get()); + + consumer = consumerSession.createConsumer(queue); + consumerConnection.start(); + + + consumer.receive(); + + Thread.sleep(1000); + + assertEquals("Message incorrectly sent after one message received", 4, _sentMessages.get()); + + + consumer.receive(); + + Thread.sleep(1000); + + assertEquals("Message not sent after two messages received", 5, _sentMessages.get()); + + } + + public void testBrokerLogMessages() + throws JMSException, NamingException, AMQException, InterruptedException, IOException + { + String queueName = getTestQueueName(); + + final Map<String,Object> arguments = new HashMap<String, Object>(); + arguments.put("x-qpid-capacity",1000); + arguments.put("x-qpid-flow-resume-capacity",800); + ((AMQSession) producerSession).createQueue(new AMQShortString(queueName), true, false, false, arguments); + queue = producerSession.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='false'&autodelete='true'"); + ((AMQSession) producerSession).declareAndBind((AMQDestination)queue); + producer = producerSession.createProducer(queue); + + _sentMessages.set(0); + + + // try to send 5 messages (should block after 4) + sendMessagesAsync(producer, producerSession, 5, 50L); + + Thread.sleep(5000); + List<String> results = waitAndFindMatches("QUE-1003"); + + assertEquals("Did not find correct number of QUE-1003 queue overfull messages", 1, results.size()); + + consumer = consumerSession.createConsumer(queue); + consumerConnection.start(); + + + while(consumer.receive(1000) != null); + + results = waitAndFindMatches("QUE-1004"); + + assertEquals("Did not find correct number of UNDERFULL queue underfull messages", 1, results.size()); + + + + } + + + public void testClientLogMessages() + throws JMSException, NamingException, AMQException, InterruptedException, IOException + { + String queueName = getTestQueueName(); + + setTestClientSystemProperty("qpid.flow_control_wait_failure","3000"); + setTestClientSystemProperty("qpid.flow_control_wait_notify_period","1000"); + + Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + + final Map<String,Object> arguments = new HashMap<String, Object>(); + arguments.put("x-qpid-capacity",1000); + arguments.put("x-qpid-flow-resume-capacity",800); + ((AMQSession) session).createQueue(new AMQShortString(queueName), true, false, false, arguments); + queue = producerSession.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='false'&autodelete='true'"); + ((AMQSession) session).declareAndBind((AMQDestination)queue); + producer = session.createProducer(queue); + + _sentMessages.set(0); + + + // try to send 5 messages (should block after 4) + MessageSender sender = sendMessagesAsync(producer, producerSession, 5, 50L); + + Thread.sleep(TIMEOUT); + List<String> results = waitAndFindMatches("Message send delayed by", TIMEOUT); + assertTrue("No delay messages logged by client",results.size()!=0); + results = findMatches("Message send failed due to timeout waiting on broker enforced flow control"); + assertEquals("Incorrect number of send failure messages logged by client",1,results.size()); + + + + } + + + public void testFlowControlOnCapacityResumeEqual() + throws JMSException, NamingException, AMQException, InterruptedException + { + String queueName = getTestQueueName(); + + final Map<String,Object> arguments = new HashMap<String, Object>(); + arguments.put("x-qpid-capacity",1000); + arguments.put("x-qpid-flow-resume-capacity",1000); + ((AMQSession) producerSession).createQueue(new AMQShortString(queueName), true, false, false, arguments); + queue = producerSession.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='false'&autodelete='true'"); + ((AMQSession) producerSession).declareAndBind((AMQDestination)queue); + producer = producerSession.createProducer(queue); + + _sentMessages.set(0); + + // try to send 5 messages (should block after 4) + sendMessagesAsync(producer, producerSession, 5, 50L); + + Thread.sleep(5000); + + assertEquals("Incorrect number of message sent before blocking", 4, _sentMessages.get()); + + consumer = consumerSession.createConsumer(queue); + consumerConnection.start(); + + + consumer.receive(); + + Thread.sleep(1000); + + assertEquals("Message incorrectly sent after one message received", 5, _sentMessages.get()); + + + } + + + public void testFlowControlSoak() + throws Exception, NamingException, AMQException, InterruptedException + { + String queueName = getTestQueueName(); + + _sentMessages.set(0); + final int numProducers = 10; + final int numMessages = 100; + + final Map<String,Object> arguments = new HashMap<String, Object>(); + arguments.put("x-qpid-capacity",6000); + arguments.put("x-qpid-flow-resume-capacity",3000); + + ((AMQSession) consumerSession).createQueue(new AMQShortString(queueName), false, false, false, arguments); + + queue = producerSession.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='false'&autodelete='false'"); + ((AMQSession) consumerSession).declareAndBind((AMQDestination)queue); + consumerConnection.start(); + + Connection[] producers = new Connection[numProducers]; + for(int i = 0 ; i < numProducers; i ++) + { + + producers[i] = getConnection(); + producers[i].start(); + Session session = producers[i].createSession(false, Session.AUTO_ACKNOWLEDGE); + + MessageProducer myproducer = session.createProducer(queue); + MessageSender sender = sendMessagesAsync(myproducer, session, numMessages, 50L); + } + + consumer = consumerSession.createConsumer(queue); + consumerConnection.start(); + + for(int j = 0; j < numProducers * numMessages; j++) + { + + Message msg = consumer.receive(5000); + Thread.sleep(50L); + assertNotNull("Message not received("+j+"), sent: "+_sentMessages.get(), msg); + + } + + + + Message msg = consumer.receive(500); + assertNull("extra message received", msg); + + + for(int i = 0; i < numProducers; i++) + { + producers[i].close(); + } + + } + + + + public void testSendTimeout() + throws JMSException, NamingException, AMQException, InterruptedException + { + String queueName = getTestQueueName(); + + setTestClientSystemProperty("qpid.flow_control_wait_failure","3000"); + Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + + final Map<String,Object> arguments = new HashMap<String, Object>(); + arguments.put("x-qpid-capacity",1000); + arguments.put("x-qpid-flow-resume-capacity",800); + ((AMQSession) session).createQueue(new AMQShortString(queueName), true, false, false, arguments); + queue = producerSession.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='false'&autodelete='true'"); + ((AMQSession) session).declareAndBind((AMQDestination)queue); + producer = session.createProducer(queue); + + _sentMessages.set(0); + + + // try to send 5 messages (should block after 4) + MessageSender sender = sendMessagesAsync(producer, producerSession, 5, 50L); + + Thread.sleep(10000); + + Exception e = sender.getException(); + + assertNotNull("No timeout exception on sending", e); + + } + + + public void testFlowControlAttributeModificationViaJMX() + throws JMSException, NamingException, AMQException, InterruptedException, Exception + { + _jmxUtils.open(); + _jmxUtilConnected = true; + + String queueName = getTestQueueName(); + + //create queue + final Map<String,Object> arguments = new HashMap<String, Object>(); + arguments.put("x-qpid-capacity",0); + arguments.put("x-qpid-flow-resume-capacity",0); + ((AMQSession) producerSession).createQueue(new AMQShortString(queueName), true, false, false, arguments); + + queue = producerSession.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='false'&autodelete='true'"); + + ((AMQSession) producerSession).declareAndBind((AMQDestination)queue); + producer = producerSession.createProducer(queue); + + Thread.sleep(1000); + + //Create a JMX MBean proxy for the queue + ManagedQueue queueMBean = _jmxUtils.getManagedObject(ManagedQueue.class, _jmxUtils.getQueueObjectName("test", queueName)); + assertNotNull(queueMBean); + + //check current attribute values are 0 as expected + assertTrue("Capacity was not the expected value", queueMBean.getCapacity() == 0L); + assertTrue("FlowResumeCapacity was not the expected value", queueMBean.getFlowResumeCapacity() == 0L); + + //set new values that will cause flow control to be active, and the queue to become overfull after 1 message is sent + queueMBean.setCapacity(250L); + queueMBean.setFlowResumeCapacity(250L); + assertTrue("Capacity was not the expected value", queueMBean.getCapacity() == 250L); + assertTrue("FlowResumeCapacity was not the expected value", queueMBean.getFlowResumeCapacity() == 250L); + assertFalse("Queue should not be overfull", queueMBean.isFlowOverfull()); + + // try to send 2 messages (should block after 1) + _sentMessages.set(0); + sendMessagesAsync(producer, producerSession, 2, 50L); + + Thread.sleep(2000); + + //check only 1 message was sent, and queue is overfull + assertEquals("Incorrect number of message sent before blocking", 1, _sentMessages.get()); + assertTrue("Queue should be overfull", queueMBean.isFlowOverfull()); + + //raise the attribute values, causing the queue to become underfull and allow the second message to be sent. + queueMBean.setCapacity(300L); + queueMBean.setFlowResumeCapacity(300L); + + Thread.sleep(2000); + + //check second message was sent, and caused the queue to become overfull again + assertEquals("Second message was not sent after lifting FlowResumeCapacity", 2, _sentMessages.get()); + assertTrue("Queue should be overfull", queueMBean.isFlowOverfull()); + + //raise capacity above queue depth, check queue remains overfull as FlowResumeCapacity still exceeded + queueMBean.setCapacity(700L); + assertTrue("Queue should be overfull", queueMBean.isFlowOverfull()); + + //receive a message, check queue becomes underfull + + consumer = consumerSession.createConsumer(queue); + consumerConnection.start(); + + consumer.receive(); + + //perform a synchronous op on the connection + ((AMQSession) consumerSession).sync(); + + assertFalse("Queue should not be overfull", queueMBean.isFlowOverfull()); + + consumer.receive(); + } + + private MessageSender sendMessagesAsync(final MessageProducer producer, + final Session producerSession, + final int numMessages, + long sleepPeriod) + { + MessageSender sender = new MessageSender(producer, producerSession, numMessages,sleepPeriod); + new Thread(sender).start(); + return sender; + } + + private void sendMessages(MessageProducer producer, Session producerSession, int numMessages, long sleepPeriod) + throws JMSException + { + + for (int msg = 0; msg < numMessages; msg++) + { + producer.send(nextMessage(msg, producerSession)); + _sentMessages.incrementAndGet(); + + + try + { + ((AMQSession)producerSession).sync(); + } + catch (AMQException e) + { + e.printStackTrace(); + throw new RuntimeException(e); + } + } + } + + private static final byte[] BYTE_300 = new byte[300]; + + + private Message nextMessage(int msg, Session producerSession) throws JMSException + { + BytesMessage send = producerSession.createBytesMessage(); + send.writeBytes(BYTE_300); + send.setIntProperty("msg", msg); + + return send; + } + + + private class MessageSender implements Runnable + { + private final MessageProducer _producer; + private final Session _producerSession; + private final int _numMessages; + + + + private JMSException _exception; + private long _sleepPeriod; + + public MessageSender(MessageProducer producer, Session producerSession, int numMessages, long sleepPeriod) + { + _producer = producer; + _producerSession = producerSession; + _numMessages = numMessages; + _sleepPeriod = sleepPeriod; + } + + public void run() + { + try + { + sendMessages(_producer, _producerSession, _numMessages, _sleepPeriod); + } + catch (JMSException e) + { + _exception = e; + } + } + + public JMSException getException() + { + return _exception; + } + } +} |