/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ package org.apache.qpid.server.queue; import org.apache.log4j.Logger; import org.apache.qpid.client.AMQSession; import org.apache.qpid.client.AMQDestination; import org.apache.qpid.AMQException; import org.apache.qpid.management.common.mbeans.ManagedQueue; import org.apache.qpid.server.logging.AbstractTestLogging; import org.apache.qpid.test.utils.JMXTestUtils; import org.apache.qpid.framing.AMQShortString; import javax.jms.*; import javax.naming.NamingException; import java.util.HashMap; import java.util.Map; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; import java.io.IOException; public class ProducerFlowControlTest extends AbstractTestLogging { private static final int TIMEOUT = 10000; private static final Logger _logger = Logger.getLogger(ProducerFlowControlTest.class); private Connection producerConnection; private MessageProducer producer; private Session producerSession; private Queue queue; private Connection consumerConnection; private Session consumerSession; private MessageConsumer consumer; private final AtomicInteger _sentMessages = new AtomicInteger(); private JMXTestUtils _jmxUtils; private boolean _jmxUtilConnected; private static final String USER = "admin"; public void setUp() throws Exception { _jmxUtils = new JMXTestUtils(this, USER , USER); _jmxUtils.setUp(); _jmxUtilConnected=false; super.setUp(); _monitor.reset(); producerConnection = getConnection(); producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); producerConnection.start(); consumerConnection = getConnection(); consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); } public void tearDown() throws Exception { if(_jmxUtilConnected) { try { _jmxUtils.close(); } catch (IOException e) { e.printStackTrace(); } } producerConnection.close(); consumerConnection.close(); super.tearDown(); } public void testCapacityExceededCausesBlock() throws JMSException, NamingException, AMQException, InterruptedException { String queueName = getTestQueueName(); final Map arguments = new HashMap(); arguments.put("x-qpid-capacity",1000); arguments.put("x-qpid-flow-resume-capacity",800); ((AMQSession) producerSession).createQueue(new AMQShortString(queueName), true, false, false, arguments); queue = producerSession.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='false'&autodelete='true'"); ((AMQSession) producerSession).declareAndBind((AMQDestination)queue); producer = producerSession.createProducer(queue); _sentMessages.set(0); // try to send 5 messages (should block after 4) sendMessagesAsync(producer, producerSession, 5, 50L); Thread.sleep(5000); assertEquals("Incorrect number of message sent before blocking", 4, _sentMessages.get()); consumer = consumerSession.createConsumer(queue); consumerConnection.start(); consumer.receive(); Thread.sleep(1000); assertEquals("Message incorrectly sent after one message received", 4, _sentMessages.get()); consumer.receive(); Thread.sleep(1000); assertEquals("Message not sent after two messages received", 5, _sentMessages.get()); } public void testBrokerLogMessages() throws JMSException, NamingException, AMQException, InterruptedException, IOException { String queueName = getTestQueueName(); final Map arguments = new HashMap(); arguments.put("x-qpid-capacity",1000); arguments.put("x-qpid-flow-resume-capacity",800); ((AMQSession) producerSession).createQueue(new AMQShortString(queueName), true, false, false, arguments); queue = producerSession.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='false'&autodelete='true'"); ((AMQSession) producerSession).declareAndBind((AMQDestination)queue); producer = producerSession.createProducer(queue); _sentMessages.set(0); // try to send 5 messages (should block after 4) sendMessagesAsync(producer, producerSession, 5, 50L); Thread.sleep(5000); List results = waitAndFindMatches("QUE-1003"); assertEquals("Did not find correct number of QUE-1003 queue overfull messages", 1, results.size()); consumer = consumerSession.createConsumer(queue); consumerConnection.start(); while(consumer.receive(1000) != null); results = waitAndFindMatches("QUE-1004"); assertEquals("Did not find correct number of UNDERFULL queue underfull messages", 1, results.size()); } public void testClientLogMessages() throws JMSException, NamingException, AMQException, InterruptedException, IOException { String queueName = getTestQueueName(); setTestClientSystemProperty("qpid.flow_control_wait_failure","3000"); setTestClientSystemProperty("qpid.flow_control_wait_notify_period","1000"); Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); final Map arguments = new HashMap(); arguments.put("x-qpid-capacity",1000); arguments.put("x-qpid-flow-resume-capacity",800); ((AMQSession) session).createQueue(new AMQShortString(queueName), true, false, false, arguments); queue = producerSession.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='false'&autodelete='true'"); ((AMQSession) session).declareAndBind((AMQDestination)queue); producer = session.createProducer(queue); _sentMessages.set(0); // try to send 5 messages (should block after 4) MessageSender sender = sendMessagesAsync(producer, producerSession, 5, 50L); Thread.sleep(TIMEOUT); List results = waitAndFindMatches("Message send delayed by", TIMEOUT); assertTrue("No delay messages logged by client",results.size()!=0); results = findMatches("Message send failed due to timeout waiting on broker enforced flow control"); assertEquals("Incorrect number of send failure messages logged by client",1,results.size()); } public void testFlowControlOnCapacityResumeEqual() throws JMSException, NamingException, AMQException, InterruptedException { String queueName = getTestQueueName(); final Map arguments = new HashMap(); arguments.put("x-qpid-capacity",1000); arguments.put("x-qpid-flow-resume-capacity",1000); ((AMQSession) producerSession).createQueue(new AMQShortString(queueName), true, false, false, arguments); queue = producerSession.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='false'&autodelete='true'"); ((AMQSession) producerSession).declareAndBind((AMQDestination)queue); producer = producerSession.createProducer(queue); _sentMessages.set(0); // try to send 5 messages (should block after 4) sendMessagesAsync(producer, producerSession, 5, 50L); Thread.sleep(5000); assertEquals("Incorrect number of message sent before blocking", 4, _sentMessages.get()); consumer = consumerSession.createConsumer(queue); consumerConnection.start(); consumer.receive(); Thread.sleep(1000); assertEquals("Message incorrectly sent after one message received", 5, _sentMessages.get()); } public void testFlowControlSoak() throws Exception, NamingException, AMQException, InterruptedException { String queueName = getTestQueueName(); _sentMessages.set(0); final int numProducers = 10; final int numMessages = 100; final Map arguments = new HashMap(); arguments.put("x-qpid-capacity",6000); arguments.put("x-qpid-flow-resume-capacity",3000); ((AMQSession) consumerSession).createQueue(new AMQShortString(queueName), false, false, false, arguments); queue = producerSession.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='false'&autodelete='false'"); ((AMQSession) consumerSession).declareAndBind((AMQDestination)queue); consumerConnection.start(); Connection[] producers = new Connection[numProducers]; for(int i = 0 ; i < numProducers; i ++) { producers[i] = getConnection(); producers[i].start(); Session session = producers[i].createSession(false, Session.AUTO_ACKNOWLEDGE); MessageProducer myproducer = session.createProducer(queue); MessageSender sender = sendMessagesAsync(myproducer, session, numMessages, 50L); } consumer = consumerSession.createConsumer(queue); consumerConnection.start(); for(int j = 0; j < numProducers * numMessages; j++) { Message msg = consumer.receive(5000); Thread.sleep(50L); assertNotNull("Message not received("+j+"), sent: "+_sentMessages.get(), msg); } Message msg = consumer.receive(500); assertNull("extra message received", msg); for(int i = 0; i < numProducers; i++) { producers[i].close(); } } public void testSendTimeout() throws JMSException, NamingException, AMQException, InterruptedException { String queueName = getTestQueueName(); setTestClientSystemProperty("qpid.flow_control_wait_failure","3000"); Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); final Map arguments = new HashMap(); arguments.put("x-qpid-capacity",1000); arguments.put("x-qpid-flow-resume-capacity",800); ((AMQSession) session).createQueue(new AMQShortString(queueName), true, false, false, arguments); queue = producerSession.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='false'&autodelete='true'"); ((AMQSession) session).declareAndBind((AMQDestination)queue); producer = session.createProducer(queue); _sentMessages.set(0); // try to send 5 messages (should block after 4) MessageSender sender = sendMessagesAsync(producer, producerSession, 5, 50L); Thread.sleep(10000); Exception e = sender.getException(); assertNotNull("No timeout exception on sending", e); } public void testFlowControlAttributeModificationViaJMX() throws JMSException, NamingException, AMQException, InterruptedException, Exception { _jmxUtils.open(); _jmxUtilConnected = true; String queueName = getTestQueueName(); //create queue final Map arguments = new HashMap(); arguments.put("x-qpid-capacity",0); arguments.put("x-qpid-flow-resume-capacity",0); ((AMQSession) producerSession).createQueue(new AMQShortString(queueName), true, false, false, arguments); queue = producerSession.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='false'&autodelete='true'"); ((AMQSession) producerSession).declareAndBind((AMQDestination)queue); producer = producerSession.createProducer(queue); Thread.sleep(1000); //Create a JMX MBean proxy for the queue ManagedQueue queueMBean = _jmxUtils.getManagedObject(ManagedQueue.class, _jmxUtils.getQueueObjectName("test", queueName)); assertNotNull(queueMBean); //check current attribute values are 0 as expected assertTrue("Capacity was not the expected value", queueMBean.getCapacity() == 0L); assertTrue("FlowResumeCapacity was not the expected value", queueMBean.getFlowResumeCapacity() == 0L); //set new values that will cause flow control to be active, and the queue to become overfull after 1 message is sent queueMBean.setCapacity(250L); queueMBean.setFlowResumeCapacity(250L); assertTrue("Capacity was not the expected value", queueMBean.getCapacity() == 250L); assertTrue("FlowResumeCapacity was not the expected value", queueMBean.getFlowResumeCapacity() == 250L); assertFalse("Queue should not be overfull", queueMBean.isFlowOverfull()); // try to send 2 messages (should block after 1) _sentMessages.set(0); sendMessagesAsync(producer, producerSession, 2, 50L); Thread.sleep(2000); //check only 1 message was sent, and queue is overfull assertEquals("Incorrect number of message sent before blocking", 1, _sentMessages.get()); assertTrue("Queue should be overfull", queueMBean.isFlowOverfull()); //raise the attribute values, causing the queue to become underfull and allow the second message to be sent. queueMBean.setCapacity(300L); queueMBean.setFlowResumeCapacity(300L); Thread.sleep(2000); //check second message was sent, and caused the queue to become overfull again assertEquals("Second message was not sent after lifting FlowResumeCapacity", 2, _sentMessages.get()); assertTrue("Queue should be overfull", queueMBean.isFlowOverfull()); //raise capacity above queue depth, check queue remains overfull as FlowResumeCapacity still exceeded queueMBean.setCapacity(700L); assertTrue("Queue should be overfull", queueMBean.isFlowOverfull()); //receive a message, check queue becomes underfull consumer = consumerSession.createConsumer(queue); consumerConnection.start(); consumer.receive(); //perform a synchronous op on the connection ((AMQSession) consumerSession).sync(); assertFalse("Queue should not be overfull", queueMBean.isFlowOverfull()); consumer.receive(); } private MessageSender sendMessagesAsync(final MessageProducer producer, final Session producerSession, final int numMessages, long sleepPeriod) { MessageSender sender = new MessageSender(producer, producerSession, numMessages,sleepPeriod); new Thread(sender).start(); return sender; } private void sendMessages(MessageProducer producer, Session producerSession, int numMessages, long sleepPeriod) throws JMSException { for (int msg = 0; msg < numMessages; msg++) { producer.send(nextMessage(msg, producerSession)); _sentMessages.incrementAndGet(); try { ((AMQSession)producerSession).sync(); } catch (AMQException e) { e.printStackTrace(); throw new RuntimeException(e); } } } private static final byte[] BYTE_300 = new byte[300]; private Message nextMessage(int msg, Session producerSession) throws JMSException { BytesMessage send = producerSession.createBytesMessage(); send.writeBytes(BYTE_300); send.setIntProperty("msg", msg); return send; } private class MessageSender implements Runnable { private final MessageProducer _producer; private final Session _producerSession; private final int _numMessages; private JMSException _exception; private long _sleepPeriod; public MessageSender(MessageProducer producer, Session producerSession, int numMessages, long sleepPeriod) { _producer = producer; _producerSession = producerSession; _numMessages = numMessages; _sleepPeriod = sleepPeriod; } public void run() { try { sendMessages(_producer, _producerSession, _numMessages, _sleepPeriod); } catch (JMSException e) { _exception = e; } } public JMSException getException() { return _exception; } } }