summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndrew Donald Kennedy <grkvlt@apache.org>2011-02-17 14:34:10 +0000
committerAndrew Donald Kennedy <grkvlt@apache.org>2011-02-17 14:34:10 +0000
commita6346f88fda91f1df09417e0306d0f5370dbc9bf (patch)
treef0c865212ca13a9f43928df699704deddfbd965f
parentbb1d6f435dad40b0afdbccf317abf899261a9432 (diff)
downloadqpid-python-a6346f88fda91f1df09417e0306d0f5370dbc9bf.tar.gz
QPID-3047: Fix QueueDepthWithSelectorTest on 0-10
Refactor test and fix 0-10 client session to flush acks git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1071620 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java1
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/queue/QueueDepthWithSelectorTest.java141
2 files changed, 31 insertions, 111 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index 1eaccf53fc..6fa22b7971 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -942,6 +942,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
protected Long requestQueueDepth(AMQDestination amqd)
{
+ flushAcknowledgments();
return getQpidSession().queueQuery(amqd.getQueueName()).get().getMessageCount();
}
diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/QueueDepthWithSelectorTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/QueueDepthWithSelectorTest.java
index 6211dd8e70..74f50e8659 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/queue/QueueDepthWithSelectorTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/queue/QueueDepthWithSelectorTest.java
@@ -21,30 +21,18 @@
package org.apache.qpid.server.queue;
-import junit.framework.TestCase;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.client.AMQDestination;
-import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.client.transport.TransportConnection;
-import org.apache.qpid.jndi.PropertiesFileInitialContextFactory;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.util.InternalBrokerBaseCase;
-
import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
-import javax.naming.Context;
-import javax.naming.NamingException;
-import javax.naming.spi.InitialContextFactory;
-import java.util.Hashtable;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
/**
* Test Case to ensure that messages are correctly returned.
@@ -52,19 +40,12 @@ import java.util.Hashtable;
* - The message is returned.
* - The broker doesn't leak memory.
* - The broker's state is correct after test.
- *
- * Why is this hardcoded to InVM testing, should be converted to QTC.
*/
-public class QueueDepthWithSelectorTest extends InternalBrokerBaseCase
+public class QueueDepthWithSelectorTest extends QpidBrokerTestCase
{
- protected static final Logger _logger = Logger.getLogger(QueueDepthWithSelectorTest.class);
-
- protected final String BROKER = "vm://:"+ApplicationRegistry.DEFAULT_INSTANCE;
protected final String VHOST = "test";
protected final String QUEUE = this.getClass().getName();
- protected Context _context;
-
protected Connection _clientConnection;
protected Connection _producerConnection;
private Session _clientSession;
@@ -82,47 +63,21 @@ public class QueueDepthWithSelectorTest extends InternalBrokerBaseCase
public void setUp() throws Exception
{
super.setUp();
- TransportConnection.createVMBroker(ApplicationRegistry.DEFAULT_INSTANCE);
-
- System.err.println("amqj.logging.level:" + System.getProperty("amqj.logging.level"));
- System.err.println("_logger.level:" + _logger.getLevel());
- System.err.println("_logger.isE-Error:" + _logger.isEnabledFor(Level.ERROR));
- System.err.println("_logger.isE-Warn:" + _logger.isEnabledFor(Level.WARN));
- System.err.println("_logger.isInfo:" + _logger.isInfoEnabled() + ":" + _logger.isEnabledFor(Level.INFO));
- System.err.println("_logger.isDebug:" + _logger.isDebugEnabled() + ":" + _logger.isEnabledFor(Level.DEBUG));
- System.err.println("_logger.isTrace:" + _logger.isTraceEnabled() + ":" + _logger.isEnabledFor(Level.TRACE));
-
- System.err.println(Logger.getRootLogger().getLoggerRepository());
-
- InitialContextFactory factory = new PropertiesFileInitialContextFactory();
-
- Hashtable<String, String> env = new Hashtable<String, String>();
-
- env.put("connectionfactory.connection", "amqp://guest:guest@TTL_TEST_ID/" + VHOST + "?brokerlist='" + BROKER + "'");
- env.put("queue.queue", QUEUE);
-
- _context = factory.getInitialContext(env);
_messages = new Message[MSG_COUNT];
- _queue = (Queue) _context.lookup("queue");
- init();
- }
-
- @Override
- public void tearDown() throws Exception
- {
- if (_producerConnection != null)
- {
- _producerConnection.close();
- }
-
- if (_clientConnection != null)
- {
- _clientConnection.close();
- }
+ _queue = getTestQueue();
+
+ //Create Producer
+ _producerConnection = getConnection();
+ _producerConnection.start();
+ _producerSession = _producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ _producer = _producerSession.createProducer(_queue);
- TransportConnection.killVMBroker(ApplicationRegistry.DEFAULT_INSTANCE);
- super.tearDown();
+ // Create consumer
+ _clientConnection = getConnection();
+ _clientConnection.start();
+ _clientSession = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ _consumer = _clientSession.createConsumer(_queue, "key = 23");
}
public void test() throws Exception
@@ -139,7 +94,8 @@ public class QueueDepthWithSelectorTest extends InternalBrokerBaseCase
//Verify we get all the messages.
_logger.info("Verifying messages");
- verifyAllMessagesRecevied(0);
+ verifyAllMessagesRecevied(50);
+ verifyBrokerState(0);
//Close the connection.. .giving the broker time to clean up its state.
_clientConnection.close();
@@ -149,39 +105,18 @@ public class QueueDepthWithSelectorTest extends InternalBrokerBaseCase
verifyBrokerState(0);
}
- protected void init() throws NamingException, JMSException, AMQException
- {
- //Create Producer
- _producerConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection();
- _producerConnection.start();
- _producerSession = _producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- _producer = _producerSession.createProducer(_queue);
-
- // Create consumer
- _clientConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection();
- _clientConnection.start();
- _clientSession = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- _consumer = _clientSession.createConsumer(_queue, "key = 23");
- }
-
protected void verifyBrokerState(int expectedDepth)
{
try
{
- _clientConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection();
-
- _clientSession = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- }
- catch (Exception e)
- {
- fail(e.getMessage());
- }
+ Connection connection = getConnection();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- try
- {
Thread.sleep(2000);
- long queueDepth = ((AMQSession) _clientSession).getQueueDepth((AMQDestination) _queue);
+ long queueDepth = ((AMQSession) session).getQueueDepth((AMQDestination) _queue);
assertEquals("Session reports Queue depth not as expected", expectedDepth, queueDepth);
+
+ connection.close();
}
catch (InterruptedException e)
{
@@ -191,34 +126,22 @@ public class QueueDepthWithSelectorTest extends InternalBrokerBaseCase
{
fail(e.getMessage());
}
- finally
+ catch (Exception e)
{
- try
- {
- _clientConnection.close();
- }
- catch (JMSException e)
- {
- fail(e.getMessage());
- }
+ fail(e.getMessage());
}
-
}
protected void verifyAllMessagesRecevied(int expectedDepth) throws Exception
{
-
boolean[] msgIdRecevied = new boolean[MSG_COUNT];
- for (int i = 0; i < MSG_COUNT; i++)
+ for (int i = 0; i < expectedDepth; i++)
{
_messages[i] = _consumer.receive(1000);
assertNotNull("should have received a message but didn't", _messages[i]);
}
-
- long queueDepth = ((AMQSession) _clientSession).getQueueDepth((AMQDestination) _queue);
- assertEquals("Session reports Queue depth not as expected", expectedDepth, queueDepth);
-
+
//Check received messages
int msgId = 0;
for (Message msg : _messages)
@@ -231,7 +154,7 @@ public class QueueDepthWithSelectorTest extends InternalBrokerBaseCase
}
//Check all received
- for (msgId = 0; msgId < MSG_COUNT; msgId++)
+ for (msgId = 0; msgId < expectedDepth; msgId++)
{
assertTrue("Message " + msgId + " not received.", msgIdRecevied[msgId]);
}
@@ -241,9 +164,6 @@ public class QueueDepthWithSelectorTest extends InternalBrokerBaseCase
* Get the next message putting the given count into the intProperties as ID.
*
* @param msgNo the message count to store as ID.
- *
- * @return
- *
* @throws JMSException
*/
protected Message nextMessage(int msgNo) throws JMSException
@@ -253,5 +173,4 @@ public class QueueDepthWithSelectorTest extends InternalBrokerBaseCase
send.setIntProperty("key", 23);
return send;
}
-
}