diff options
Diffstat (limited to 'java')
-rw-r--r-- | java/systests/src/main/java/org/apache/qpid/test/VMTestCase.java | 34 | ||||
-rw-r--r-- | java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java | 32 |
2 files changed, 45 insertions, 21 deletions
diff --git a/java/systests/src/main/java/org/apache/qpid/test/VMTestCase.java b/java/systests/src/main/java/org/apache/qpid/test/VMTestCase.java index a58b7a116a..db3a23cd59 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/VMTestCase.java +++ b/java/systests/src/main/java/org/apache/qpid/test/VMTestCase.java @@ -21,16 +21,18 @@ package org.apache.qpid.test; import junit.framework.TestCase; +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.AMQSession; import org.apache.qpid.client.transport.TransportConnection; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.jndi.PropertiesFileInitialContextFactory; import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.AMQException; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; +import javax.jms.Queue; import javax.jms.Session; import javax.naming.Context; import javax.naming.NamingException; @@ -94,14 +96,14 @@ public class VMTestCase extends TestCase env.put("connectionfactory." + c.getKey(), c.getValue()); } - env.put("queue.queue", "queue"); + _queues.put("queue", "queue"); for (Map.Entry<String, String> q : _queues.entrySet()) { env.put("queue." + q.getKey(), q.getValue()); } - env.put("topic.topic", "topic"); + _topics.put("topic", "topic"); for (Map.Entry<String, String> t : _topics.entrySet()) { @@ -113,7 +115,7 @@ public class VMTestCase extends TestCase protected void tearDown() throws Exception { - purgeQueues(); + checkQueuesClean(); TransportConnection.killVMBroker(1); ApplicationRegistry.remove(1); @@ -121,29 +123,35 @@ public class VMTestCase extends TestCase super.tearDown(); } - private void purgeQueues() throws NamingException, JMSException + private void checkQueuesClean() throws NamingException, JMSException { Connection connection = ((ConnectionFactory) _context.lookup("connection")).createConnection(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + connection.start(); Iterator<String> queueNames = new HashSet<String>(_queues.values()).iterator(); + assertTrue("QueueNames doesn't have next", queueNames.hasNext()); - //todo this could be replaced with an AMQP purge queue command. while (queueNames.hasNext()) { - MessageConsumer consumer = session.createConsumer(session.createQueue(queueNames.next())); + Queue queue = session.createQueue(queueNames.next()); - Message message = consumer.receive(RECEIVE_TIMEOUT); - - while (message != null) + //Validate that the queue are reporting empty. + long queueDepth = 0; + try + { + queueDepth = ((AMQSession) session).getQueueDepth((AMQDestination) queue); + } + catch (AMQException e) { - message = consumer.receive(RECEIVE_TIMEOUT); + //ignore } + assertEquals("Session reports Queue depth not as expected", 0, queueDepth); } - + connection.close(); } diff --git a/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java b/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java index fccc80ec75..9beaa9844a 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java +++ b/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java @@ -62,6 +62,8 @@ public class QueueBrowserAutoAckTest extends FailoverBaseCase _clientSession = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + //Ensure there are no messages on the queue to start with. + checkQueueDepth(0); } public void tearDown() throws Exception @@ -139,6 +141,7 @@ public class QueueBrowserAutoAckTest extends FailoverBaseCase _logger.debug("Checking for " + depth + " messages with QueueBrowser"); } + //Check what the session believes the queue count to be. long queueDepth = 0; try @@ -151,7 +154,7 @@ public class QueueBrowserAutoAckTest extends FailoverBaseCase assertEquals("Session reports Queue depth not as expected", depth, queueDepth); - + // Browse the queue to get a second opinion int msgCount = 0; Enumeration msgs = queueBrowser.getEnumeration(); @@ -277,8 +280,23 @@ public class QueueBrowserAutoAckTest extends FailoverBaseCase protected void validate(int messages) throws JMSException { - // continue and try to receive all messages - MessageConsumer consumer = _clientSession.createConsumer(_queue); + //Create a new connection to validate message content + Connection connection = null; + + try + { + connection = ((ConnectionFactory) _context.lookup("connection")).createConnection(); + } + catch (NamingException e) + { + fail("Unable to make validation connection"); + } + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + connection.start(); + + MessageConsumer consumer = session.createConsumer(_queue); _logger.info("Verify messages are still on the queue"); @@ -293,13 +311,13 @@ public class QueueBrowserAutoAckTest extends FailoverBaseCase } } - consumer.close(); + //Close this new connection + connection.close(); _logger.info("All messages recevied from queue"); //ensure no message left. checkQueueDepth(0); - } protected void checkQueueDepthWithSelectors(int clients, int totalMessages) throws JMSException @@ -387,7 +405,7 @@ public class QueueBrowserAutoAckTest extends FailoverBaseCase checkQueueDepth(messages); - for (int clients = 1; clients < 10; clients++) + for (int clients = 2; clients <= 10; clients++) { checkQueueDepthWithSelectors(clients, messages); } @@ -458,8 +476,6 @@ public class QueueBrowserAutoAckTest extends FailoverBaseCase public void testFailoverAsQueueBrowserCreated() throws JMSException { - System.err.println("Disabled... this appears to be a bug in mina."); - // The IoServiceListenerSupport seems to get stuck in with a managedSession that isn't closing when requested. // So it hangs waiting for the session. int messages = 50; |