summaryrefslogtreecommitdiff
path: root/qpid/java/systests/src/main/java/org/apache/qpid/test/client
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/systests/src/main/java/org/apache/qpid/test/client')
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java6
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/client/RollbackOrderTest.java170
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java22
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/SelectorTest.java271
4 files changed, 410 insertions, 59 deletions
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java
index 62b54d3086..f9cf48a2b1 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java
@@ -61,7 +61,7 @@ public class QueueBrowserAutoAckTest extends FailoverBaseCase
setupSession();
- _queue = _clientSession.createQueue(getName()+System.currentTimeMillis());
+ _queue = _clientSession.createQueue(getTestQueueName());
_clientSession.createConsumer(_queue).close();
//Ensure there are no messages on the queue to start with.
@@ -497,7 +497,7 @@ public class QueueBrowserAutoAckTest extends FailoverBaseCase
if (msgCount == failPoint)
{
- failBroker();
+ failBroker(getFailingPort());
}
}
@@ -529,7 +529,7 @@ public class QueueBrowserAutoAckTest extends FailoverBaseCase
sendMessages("connection2", messages);
}
- failBroker();
+ failBroker(getFailingPort());
checkQueueDepth(messages);
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/RollbackOrderTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/RollbackOrderTest.java
index 39e2b892a9..ff766c907d 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/RollbackOrderTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/RollbackOrderTest.java
@@ -22,64 +22,172 @@ package org.apache.qpid.test.client;
import org.apache.qpid.test.utils.*;
import javax.jms.*;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+import junit.framework.ComparisonFailure;
+import junit.framework.AssertionFailedError;
/**
- * RollbackOrderTest
+ * RollbackOrderTest, QPID-1864, QPID-1871
+ *
+ * Description:
+ *
+ * The problem that this test is exposing is that the dispatcher used to be capable
+ * of holding on to a message when stopped. This ment that when the rollback was
+ * called and the dispatcher stopped it may have hold of a message. So after all
+ * the local queues(preDeliveryQueue, SynchronousQueue, PostDeliveryTagQueue)
+ * have been cleared the client still had a single message, the one the
+ * dispatcher was holding on to.
+ *
+ * As a result the TxRollback operation would run and then release the dispatcher.
+ * Whilst the dispatcher would then proceed to reject the message it was holiding
+ * the Broker would already have resent that message so the rejection would silently
+ * fail.
+ *
+ * And the client would receieve that single message 'early', depending on the
+ * number of messages already recevied when rollback was called.
+ *
+ *
+ * Aims:
+ *
+ * The tests puts 50 messages on to the queue.
+ *
+ * The test then tries to cause the dispatcher to stop whilst it is in the process
+ * of moving a message from the preDeliveryQueue to a consumers sychronousQueue.
+ *
+ * To exercise this path we have 50 message flowing to the client to give the
+ * dispatcher a bit of work to do moving messages.
+ *
+ * Then we loop - 10 times
+ * - Validating that the first message received is always message 1.
+ * - Receive a few more so that there are a few messages to reject.
+ * - call rollback, to try and catch the dispatcher mid process.
+ *
+ * Outcome:
+ *
+ * The hope is that we catch the dispatcher mid process and cause a BasicReject
+ * to fail. Which will be indicated in the log but will also cause that failed
+ * rejected message to be the next to be delivered which will not be message 1
+ * as expected.
+ *
+ * We are testing a race condition here but we can check through the log file if
+ * the race condition occured. However, performing that check will only validate
+ * the problem exists and will not be suitable as part of a system test.
*
*/
-
public class RollbackOrderTest extends QpidTestCase
{
- private Connection conn;
- private Queue queue;
- private Session ssn;
- private MessageProducer prod;
- private MessageConsumer cons;
+ private Connection _connection;
+ private Queue _queue;
+ private Session _session;
+ private MessageConsumer _consumer;
@Override public void setUp() throws Exception
{
super.setUp();
- conn = getConnection();
- conn.start();
- ssn = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
- queue = ssn.createQueue("rollback-order-test-queue");
- prod = ssn.createProducer(queue);
- cons = ssn.createConsumer(queue);
- for (int i = 0; i < 5; i++)
- {
- TextMessage msg = ssn.createTextMessage("message " + (i+1));
- prod.send(msg);
- }
- ssn.commit();
+ _connection = getConnection();
+
+ _session = _connection.createSession(true, Session.SESSION_TRANSACTED);
+ _queue = _session.createQueue(getTestQueueName());
+ _consumer = _session.createConsumer(_queue);
+
+ //Send more messages so it is more likely that the dispatcher is
+ // processing on rollback.
+ sendMessage(_session, _queue, 50);
+ _session.commit();
+
}
public void testOrderingAfterRollback() throws Exception
{
- for (int i = 0; i < 10; i++)
+ //Start the session now so we
+ _connection.start();
+
+ for (int i = 0; i < 20; i++)
{
- TextMessage msg = (TextMessage) cons.receive();
- assertEquals("message 1", msg.getText());
- ssn.rollback();
+ Message msg = _consumer.receive();
+ assertEquals("Incorrect Message Received", 0, msg.getIntProperty(INDEX));
+
+ // Pull additional messages through so we have some reject work to do
+ for (int m=0; m < 5 ; m++)
+ {
+ _consumer.receive();
+ }
+
+ System.err.println("ROT-Rollback");
+ _logger.warn("ROT-Rollback");
+ _session.rollback();
}
}
- @Override public void tearDown() throws Exception
+ public void testOrderingAfterRollbackOnMessage() throws Exception
{
- while (true)
+ final CountDownLatch count= new CountDownLatch(20);
+ final Exception exceptions[] = new Exception[20];
+ final AtomicBoolean failed = new AtomicBoolean(false);
+
+ _consumer.setMessageListener(new MessageListener()
{
- Message msg = cons.receiveNoWait();
- if (msg == null)
+
+ public void onMessage(Message message)
{
- break;
+
+ Message msg = message;
+ try
+ {
+ count.countDown();
+ assertEquals("Incorrect Message Received", 0, msg.getIntProperty(INDEX));
+
+ _session.rollback();
+ }
+ catch (JMSException e)
+ {
+ System.out.println("Error:" + e.getMessage());
+ exceptions[(int)count.getCount()] = e;
+ }
+ catch (AssertionFailedError cf)
+ {
+ // End Test if Equality test fails
+ while (count.getCount() != 0)
+ {
+ count.countDown();
+ }
+
+ System.out.println("Error:" + cf.getMessage());
+ System.err.println(cf.getMessage());
+ cf.printStackTrace();
+ failed.set(true);
+ }
}
- else
+ });
+ //Start the session now so we
+ _connection.start();
+
+ count.await();
+
+ for (Exception e : exceptions)
+ {
+ if (e != null)
{
- msg.acknowledge();
+ System.err.println(e.getMessage());
+ e.printStackTrace();
+ failed.set(true);
}
}
- ssn.commit();
+
+// _consumer.close();
+ _connection.close();
+
+ assertFalse("Exceptions thrown during test run, Check Std.err.", failed.get());
+ }
+
+ @Override public void tearDown() throws Exception
+ {
+
+ drainQueue(_queue);
+
super.tearDown();
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java
index dfc3bb7b42..c307176f3f 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java
@@ -37,7 +37,6 @@ import javax.naming.NamingException;
import org.apache.log4j.Logger;
import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQSession_0_10;
import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.jms.ConnectionListener;
import org.apache.qpid.jms.ConnectionURL;
@@ -58,13 +57,12 @@ public class FailoverTest extends FailoverBaseCase implements ConnectionListener
private Session consumerSession;
private MessageConsumer consumer;
- private static int usedBrokers = 0;
private CountDownLatch failoverComplete;
- private static final long DEFAULT_FAILOVER_TIME = 10000L;
private boolean CLUSTERED = Boolean.getBoolean("profile.clustered");
private int seed;
private Random rand;
-
+ private int _currentPort = getFailingPort();
+
@Override
protected void setUp() throws Exception
{
@@ -227,7 +225,7 @@ public class FailoverTest extends FailoverBaseCase implements ConnectionListener
_logger.info("Failing over");
- causeFailure(DEFAULT_FAILOVER_TIME);
+ causeFailure(_currentPort, DEFAULT_FAILOVER_TIME);
// Check that you produce and consume the rest of messages.
_logger.debug("==================");
@@ -242,10 +240,10 @@ public class FailoverTest extends FailoverBaseCase implements ConnectionListener
_logger.debug("==================");
}
- private void causeFailure(long delay)
+ private void causeFailure(int port, long delay)
{
- failBroker();
+ failBroker(port);
_logger.info("Awaiting Failover completion");
try
@@ -268,7 +266,7 @@ public class FailoverTest extends FailoverBaseCase implements ConnectionListener
Message msg = consumer.receive();
assertNotNull("Expected msgs not received", msg);
- causeFailure(DEFAULT_FAILOVER_TIME);
+ causeFailure(getFailingPort(), DEFAULT_FAILOVER_TIME);
Exception failure = null;
try
@@ -314,7 +312,7 @@ public class FailoverTest extends FailoverBaseCase implements ConnectionListener
long failTime = System.nanoTime() + FAILOVER_DELAY * 1000000;
//Fail the first broker
- causeFailure(FAILOVER_DELAY + DEFAULT_FAILOVER_TIME);
+ causeFailure(getFailingPort(), FAILOVER_DELAY + DEFAULT_FAILOVER_TIME);
//Reconnection should occur
assertTrue("Failover did not take long enough", System.nanoTime() > failTime);
@@ -344,15 +342,15 @@ public class FailoverTest extends FailoverBaseCase implements ConnectionListener
_logger.debug("===================================================================");
runP2PFailover(numMessages, false,false, false);
- startBroker(getFailingPort());
+ startBroker(_currentPort);
if (useAltPort)
{
- setFailingPort(altPort);
+ _currentPort = altPort;
useAltPort = false;
}
else
{
- setFailingPort(stdPort);
+ _currentPort = stdPort;
useAltPort = true;
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/SelectorTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/SelectorTest.java
index 5a5e23baa5..a09589b121 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/SelectorTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/SelectorTest.java
@@ -1,31 +1,248 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
package org.apache.qpid.test.client.message;
-import javax.jms.Connection;
-import javax.jms.Destination;
+import java.util.concurrent.CountDownLatch;
+
+import javax.jms.DeliveryMode;
+import javax.jms.InvalidSelectorException;
+import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import junit.framework.Assert;
-import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.BasicMessageProducer;
import org.apache.qpid.test.utils.QpidTestCase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-public class SelectorTest extends QpidTestCase
+public class SelectorTest extends QpidTestCase implements MessageListener
{
- private static final Logger _logger = Logger.getLogger(SelectorTest.class);
+ private static final Logger _logger = LoggerFactory.getLogger(SelectorTest.class);
- public void testSelectorWithJMSMessageID() throws Exception
+ private AMQConnection _connection;
+ private AMQDestination _destination;
+ private int count;
+ public String _connectionString = "vm://:1";
+ private static final String INVALID_SELECTOR = "Cost LIKE 5";
+ CountDownLatch _responseLatch = new CountDownLatch(1);
+
+ private static final String BAD_MATHS_SELECTOR = " 1 % 5";
+
+ private static final long RECIEVE_TIMEOUT = 1000;
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ init((AMQConnection) getConnection("guest", "guest"));
+ }
+
+ private void init(AMQConnection connection) throws JMSException
+ {
+ init(connection, new AMQQueue(connection, getTestQueueName(), true));
+ }
+
+ private void init(AMQConnection connection, AMQDestination destination) throws JMSException
+ {
+ _connection = connection;
+ _destination = destination;
+ connection.start();
+ }
+
+ public void onMessage(Message message)
+ {
+ count++;
+ _logger.info("Got Message:" + message);
+ _responseLatch.countDown();
+ }
+
+ public void testUsingOnMessage() throws Exception
+ {
+ String selector = "Cost = 2 AND \"property-with-hyphen\" = 'wibble'";
+ // selector = "JMSType = Special AND Cost = 2 AND AMQMessageID > 0 AND JMSDeliveryMode=" + DeliveryMode.NON_PERSISTENT;
+
+ Session session = (AMQSession) _connection.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+ // _session.createConsumer(destination).setMessageListener(this);
+ session.createConsumer(_destination, selector).setMessageListener(this);
+
+ try
+ {
+ Message msg = session.createTextMessage("Message");
+ msg.setJMSPriority(1);
+ msg.setIntProperty("Cost", 2);
+ msg.setStringProperty("property-with-hyphen", "wibble");
+ msg.setJMSType("Special");
+
+ _logger.info("Sending Message:" + msg);
+
+ ((BasicMessageProducer) session.createProducer(_destination)).send(msg, DeliveryMode.NON_PERSISTENT);
+ _logger.info("Message sent, waiting for response...");
+
+ _responseLatch.await();
+
+ if (count > 0)
+ {
+ _logger.info("Got message");
+ }
+
+ if (count == 0)
+ {
+ fail("Did not get message!");
+ // throw new RuntimeException("Did not get message!");
+ }
+ }
+ catch (JMSException e)
+ {
+ _logger.debug("JMS:" + e.getClass().getSimpleName() + ":" + e.getMessage());
+ if (!(e instanceof InvalidSelectorException))
+ {
+ fail("Wrong exception:" + e.getMessage());
+ }
+ else
+ {
+ System.out.println("SUCCESS!!");
+ }
+ }
+ catch (InterruptedException e)
+ {
+ _logger.debug("IE :" + e.getClass().getSimpleName() + ":" + e.getMessage());
+ }
+
+ }
+
+ public void testUnparsableSelectors() throws Exception
{
- Connection conn = getConnection();
- conn.start();
- Session session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
+ AMQSession session = (AMQSession) _connection.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+ boolean caught = false;
- Destination dest = session.createQueue("SelectorQueue");
+ //Try Creating a Browser
+ try
+ {
+ session.createBrowser(session.createQueue("Ping"), INVALID_SELECTOR);
+ }
+ catch (JMSException e)
+ {
+ _logger.debug("JMS:" + e.getClass().getSimpleName() + ":" + e.getMessage());
+ if (!(e instanceof InvalidSelectorException))
+ {
+ fail("Wrong exception:" + e.getMessage());
+ }
+ caught = true;
+ }
+ assertTrue("No exception thrown!", caught);
+ caught = false;
+
+ //Try Creating a Consumer
+ try
+ {
+ session.createConsumer(session.createQueue("Ping"), INVALID_SELECTOR);
+ }
+ catch (JMSException e)
+ {
+ _logger.debug("JMS:" + e.getClass().getSimpleName() + ":" + e.getMessage());
+ if (!(e instanceof InvalidSelectorException))
+ {
+ fail("Wrong exception:" + e.getMessage());
+ }
+ caught = true;
+ }
+ assertTrue("No exception thrown!", caught);
+ caught = false;
+
+ //Try Creating a Receiever
+ try
+ {
+ session.createReceiver(session.createQueue("Ping"), INVALID_SELECTOR);
+ }
+ catch (JMSException e)
+ {
+ _logger.debug("JMS:" + e.getClass().getSimpleName() + ":" + e.getMessage());
+ if (!(e instanceof InvalidSelectorException))
+ {
+ fail("Wrong exception:" + e.getMessage());
+ }
+ caught = true;
+ }
+ assertTrue("No exception thrown!", caught);
+ caught = false;
+
+ try
+ {
+ session.createReceiver(session.createQueue("Ping"), BAD_MATHS_SELECTOR);
+ }
+ catch (JMSException e)
+ {
+ _logger.debug("JMS:" + e.getClass().getSimpleName() + ":" + e.getMessage());
+ if (!(e instanceof InvalidSelectorException))
+ {
+ fail("Wrong exception:" + e.getMessage());
+ }
+ caught = true;
+ }
+ assertTrue("No exception thrown!", caught);
+ caught = false;
- MessageProducer prod = session.createProducer(dest);
- MessageConsumer consumer = session.createConsumer(dest,"JMSMessageID IS NOT NULL");
+ }
+
+ public void testRuntimeSelectorError() throws JMSException
+ {
+ Session session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = session.createConsumer(_destination , "testproperty % 5 = 1");
+ MessageProducer producer = session.createProducer(_destination);
+ Message sentMsg = session.createTextMessage();
+
+ sentMsg.setIntProperty("testproperty", 1); // 1 % 5
+ producer.send(sentMsg);
+ Message recvd = consumer.receive(RECIEVE_TIMEOUT);
+ assertNotNull(recvd);
+
+ sentMsg.setStringProperty("testproperty", "hello"); // "hello" % 5 makes no sense
+ producer.send(sentMsg);
+ try
+ {
+ recvd = consumer.receive(RECIEVE_TIMEOUT);
+ assertNull(recvd);
+ }
+ catch (Exception e)
+ {
+
+ }
+ assertTrue("Connection should be closed", _connection.isClosed());
+ }
+
+ public void testSelectorWithJMSMessageID() throws Exception
+ {
+ Session session = _connection.createSession(true, Session.SESSION_TRANSACTED);
+
+ MessageProducer prod = session.createProducer(_destination);
+ MessageConsumer consumer = session.createConsumer(_destination,"JMSMessageID IS NOT NULL");
for (int i=0; i<2; i++)
{
@@ -54,7 +271,7 @@ public class SelectorTest extends QpidTestCase
Message msg3 = consumer.receive(1000);
Assert.assertNull("Msg3 should be null", msg3);
session.commit();
- consumer = session.createConsumer(dest,"JMSMessageID IS NULL");
+ consumer = session.createConsumer(_destination,"JMSMessageID IS NULL");
Message msg4 = consumer.receive(1000);
Message msg5 = consumer.receive(1000);
@@ -62,4 +279,32 @@ public class SelectorTest extends QpidTestCase
Assert.assertNotNull("Msg4 should not be null", msg4);
Assert.assertNotNull("Msg5 should not be null", msg5);
}
+
+ public static void main(String[] argv) throws Exception
+ {
+ SelectorTest test = new SelectorTest();
+ test._connectionString = (argv.length == 0) ? "localhost:3000" : argv[0];
+
+ try
+ {
+ while (true)
+ {
+ if (test._connectionString.contains("vm://:1"))
+ {
+ test.setUp();
+ }
+ test.testUsingOnMessage();
+
+ if (test._connectionString.contains("vm://:1"))
+ {
+ test.tearDown();
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ System.err.println(e.getMessage());
+ e.printStackTrace();
+ }
+ }
}