summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
Diffstat (limited to 'java')
-rw-r--r--java/systests/src/main/java/org/apache/qpid/test/VMTestCase.java34
-rw-r--r--java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java32
2 files changed, 45 insertions, 21 deletions
diff --git a/java/systests/src/main/java/org/apache/qpid/test/VMTestCase.java b/java/systests/src/main/java/org/apache/qpid/test/VMTestCase.java
index a58b7a116a..db3a23cd59 100644
--- a/java/systests/src/main/java/org/apache/qpid/test/VMTestCase.java
+++ b/java/systests/src/main/java/org/apache/qpid/test/VMTestCase.java
@@ -21,16 +21,18 @@
package org.apache.qpid.test;
import junit.framework.TestCase;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.transport.TransportConnection;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.jndi.PropertiesFileInitialContextFactory;
import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.AMQException;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
+import javax.jms.Queue;
import javax.jms.Session;
import javax.naming.Context;
import javax.naming.NamingException;
@@ -94,14 +96,14 @@ public class VMTestCase extends TestCase
env.put("connectionfactory." + c.getKey(), c.getValue());
}
- env.put("queue.queue", "queue");
+ _queues.put("queue", "queue");
for (Map.Entry<String, String> q : _queues.entrySet())
{
env.put("queue." + q.getKey(), q.getValue());
}
- env.put("topic.topic", "topic");
+ _topics.put("topic", "topic");
for (Map.Entry<String, String> t : _topics.entrySet())
{
@@ -113,7 +115,7 @@ public class VMTestCase extends TestCase
protected void tearDown() throws Exception
{
- purgeQueues();
+ checkQueuesClean();
TransportConnection.killVMBroker(1);
ApplicationRegistry.remove(1);
@@ -121,29 +123,35 @@ public class VMTestCase extends TestCase
super.tearDown();
}
- private void purgeQueues() throws NamingException, JMSException
+ private void checkQueuesClean() throws NamingException, JMSException
{
Connection connection = ((ConnectionFactory) _context.lookup("connection")).createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ connection.start();
Iterator<String> queueNames = new HashSet<String>(_queues.values()).iterator();
+ assertTrue("QueueNames doesn't have next", queueNames.hasNext());
- //todo this could be replaced with an AMQP purge queue command.
while (queueNames.hasNext())
{
- MessageConsumer consumer = session.createConsumer(session.createQueue(queueNames.next()));
+ Queue queue = session.createQueue(queueNames.next());
- Message message = consumer.receive(RECEIVE_TIMEOUT);
-
- while (message != null)
+ //Validate that the queue are reporting empty.
+ long queueDepth = 0;
+ try
+ {
+ queueDepth = ((AMQSession) session).getQueueDepth((AMQDestination) queue);
+ }
+ catch (AMQException e)
{
- message = consumer.receive(RECEIVE_TIMEOUT);
+ //ignore
}
+ assertEquals("Session reports Queue depth not as expected", 0, queueDepth);
}
-
+
connection.close();
}
diff --git a/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java b/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java
index fccc80ec75..9beaa9844a 100644
--- a/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java
@@ -62,6 +62,8 @@ public class QueueBrowserAutoAckTest extends FailoverBaseCase
_clientSession = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ //Ensure there are no messages on the queue to start with.
+ checkQueueDepth(0);
}
public void tearDown() throws Exception
@@ -139,6 +141,7 @@ public class QueueBrowserAutoAckTest extends FailoverBaseCase
_logger.debug("Checking for " + depth + " messages with QueueBrowser");
}
+ //Check what the session believes the queue count to be.
long queueDepth = 0;
try
@@ -151,7 +154,7 @@ public class QueueBrowserAutoAckTest extends FailoverBaseCase
assertEquals("Session reports Queue depth not as expected", depth, queueDepth);
-
+ // Browse the queue to get a second opinion
int msgCount = 0;
Enumeration msgs = queueBrowser.getEnumeration();
@@ -277,8 +280,23 @@ public class QueueBrowserAutoAckTest extends FailoverBaseCase
protected void validate(int messages) throws JMSException
{
- // continue and try to receive all messages
- MessageConsumer consumer = _clientSession.createConsumer(_queue);
+ //Create a new connection to validate message content
+ Connection connection = null;
+
+ try
+ {
+ connection = ((ConnectionFactory) _context.lookup("connection")).createConnection();
+ }
+ catch (NamingException e)
+ {
+ fail("Unable to make validation connection");
+ }
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ connection.start();
+
+ MessageConsumer consumer = session.createConsumer(_queue);
_logger.info("Verify messages are still on the queue");
@@ -293,13 +311,13 @@ public class QueueBrowserAutoAckTest extends FailoverBaseCase
}
}
- consumer.close();
+ //Close this new connection
+ connection.close();
_logger.info("All messages recevied from queue");
//ensure no message left.
checkQueueDepth(0);
-
}
protected void checkQueueDepthWithSelectors(int clients, int totalMessages) throws JMSException
@@ -387,7 +405,7 @@ public class QueueBrowserAutoAckTest extends FailoverBaseCase
checkQueueDepth(messages);
- for (int clients = 1; clients < 10; clients++)
+ for (int clients = 2; clients <= 10; clients++)
{
checkQueueDepthWithSelectors(clients, messages);
}
@@ -458,8 +476,6 @@ public class QueueBrowserAutoAckTest extends FailoverBaseCase
public void testFailoverAsQueueBrowserCreated() throws JMSException
{
- System.err.println("Disabled... this appears to be a bug in mina.");
-
// The IoServiceListenerSupport seems to get stuck in with a managedSession that isn't closing when requested.
// So it hangs waiting for the session.
int messages = 50;