/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ package org.apache.qpid.client.prefetch; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.qpid.configuration.ClientProperties; import org.apache.qpid.test.utils.QpidBrokerTestCase; import javax.jms.Connection; import javax.jms.Destination; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; public class PrefetchBehaviourTest extends QpidBrokerTestCase { protected static final Logger _logger = LoggerFactory.getLogger(PrefetchBehaviourTest.class); private Connection _normalConnection; private AtomicBoolean _exceptionCaught; private CountDownLatch _processingStarted; private CountDownLatch _processingCompleted; protected void setUp() throws Exception { super.setUp(); _normalConnection = getConnection(); _exceptionCaught = new AtomicBoolean(); _processingStarted = new CountDownLatch(1); _processingCompleted = new CountDownLatch(1); } /** * Verifies that a slow processing asynchronous transacted consumer with prefetch=1 only * gets 1 of the messages sent, with the second consumer picking up the others while the * slow consumer is processing, i.e that prefetch=1 actually does what it says on the tin. */ public void testPrefetchOneWithAsynchronousTransactedConsumer() throws Exception { final long processingTime = 5000; //create a second connection with prefetch set to 1 setTestClientSystemProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, new Integer(1).toString()); Connection prefetch1Connection = getConnection(); prefetch1Connection.start(); _normalConnection.start(); //create an asynchronous consumer with simulated slow processing final Session prefetch1session = prefetch1Connection.createSession(true, Session.SESSION_TRANSACTED); Queue queue = prefetch1session.createQueue(getTestQueueName()); MessageConsumer prefetch1consumer = prefetch1session.createConsumer(queue); prefetch1consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { try { _logger.debug("starting processing"); _processingStarted.countDown(); _logger.debug("processing started"); //simulate message processing Thread.sleep(processingTime); prefetch1session.commit(); _processingCompleted.countDown(); } catch(Exception e) { _logger.error("Exception caught in message listener"); _exceptionCaught.set(true); } } }); //create producer and send 5 messages Session producerSession = _normalConnection.createSession(true, Session.SESSION_TRANSACTED); MessageProducer producer = producerSession.createProducer(queue); for (int i = 0; i < 5; i++) { producer.send(producerSession.createTextMessage("test")); } producerSession.commit(); //wait for the first message to start being processed by the async consumer assertTrue("Async processing failed to start in allowed timeframe", _processingStarted.await(2000, TimeUnit.MILLISECONDS)); _logger.debug("proceeding with test"); //try to consumer the other messages with another consumer while the async procesisng occurs Session normalSession = _normalConnection.createSession(true, Session.AUTO_ACKNOWLEDGE); MessageConsumer normalConsumer = normalSession.createConsumer(queue); Message msg; // Check that other consumer gets the other 4 messages for (int i = 0; i < 4; i++) { msg = normalConsumer.receive(1500); assertNotNull("Consumer should receive 4 messages",msg); } msg = normalConsumer.receive(250); assertNull("Consumer should not have received a 5th message",msg); //wait for the other consumer to finish to ensure it completes ok _logger.debug("waiting for async consumer to complete"); assertTrue("Async processing failed to complete in allowed timeframe", _processingStarted.await(processingTime + 2000, TimeUnit.MILLISECONDS)); assertFalse("Unexpected exception during async message processing",_exceptionCaught.get()); } /** * This test was originally known as AMQConnectionTest#testPrefetchSystemProperty. * */ public void testMessagesAreDistributedBetweenConsumersWithLowPrefetch() throws Exception { Queue queue = getTestQueue(); setTestClientSystemProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, new Integer(2).toString()); Connection connection = getConnection(); connection.start(); // Create Consumer A Session consSessA = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageConsumer consumerA = consSessA.createConsumer(queue); // ensure message delivery to consumer A is started (required for 0-8..0-9-1) final Message msg = consumerA.receiveNoWait(); assertNull(msg); Session producerSession = connection.createSession(true, Session.SESSION_TRANSACTED); sendMessage(producerSession, queue, 3); // Create Consumer B MessageConsumer consumerB = null; if (isBroker010()) { // 0-10 prefetch is per consumer so we create Consumer B on the same session as Consumer A consumerB = consSessA.createConsumer(queue); } else { // 0-8..0-9-1 prefetch is per session so we create Consumer B on a separate session Session consSessB = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); consumerB = consSessB.createConsumer(queue); } // As message delivery to consumer A is already started, the first two messages should // now be with consumer A. The last message will still be on the Broker as consumer A's // credit is exhausted and message delivery for consumer B is not yet running. // As described by QPID-3747, for 0-10 we *must* check Consumer B before Consumer A. // If we were to reverse the order, the SessionComplete will restore Consumer A's credit, // and the third message could be delivered to either Consumer A or Consumer B. // Check that consumer B gets the last (third) message. final Message msgConsumerB = consumerB.receive(1500); assertNotNull("Consumer B should have received a message", msgConsumerB); assertEquals("Consumer B received message with unexpected index", 2, msgConsumerB.getIntProperty(INDEX)); // Now check that consumer A has indeed got the first two messages. for (int i = 0; i < 2; i++) { final Message msgConsumerA = consumerA.receive(1500); assertNotNull("Consumer A should have received a message " + i, msgConsumerA); assertEquals("Consumer A received message with unexpected index", i, msgConsumerA.getIntProperty(INDEX)); } } /** * Test Goal: Verify if connection stop releases all messages in it's prefetch buffer. * Test Strategy: Send 10 messages to a queue. Create a consumer with maxprefetch of 5, but never consume them. * Stop the connection. Create a new connection and a consumer with maxprefetch 10 on the same queue. * Try to receive all 10 messages. */ public void testConnectionStop() throws Exception { setTestClientSystemProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, "10"); Connection con = getConnection(); con.start(); Session ssn = con.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination queue = ssn.createQueue("ADDR:my-queue;{create: always}"); MessageProducer prod = ssn.createProducer(queue); for (int i=0; i<10;i++) { prod.send(ssn.createTextMessage("Msg" + i)); } MessageConsumer consumer = ssn.createConsumer(queue); // This is to ensure we get the first client to prefetch. Message msg = consumer.receive(1000); assertNotNull("The first consumer should get one message",msg); con.stop(); Connection con2 = getConnection(); con2.start(); Session ssn2 = con2.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageConsumer consumer2 = ssn2.createConsumer(queue); for (int i=0; i<9;i++) { TextMessage m = (TextMessage)consumer2.receive(1000); assertNotNull("The second consumer should get 9 messages, but received only " + i,m); } } }