summaryrefslogtreecommitdiff
path: root/qpid/java/systests/src
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/systests/src')
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagementActorLoggingTest.java28
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/logging/SubscriptionLoggingTest.java65
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/DynamicQueueExchangeCreateTest.java88
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/JavaServerCloseRaceConditionTest.java119
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java59
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java2
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java4
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/util/LogMonitor.java20
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/util/LogMonitorTest.java73
9 files changed, 353 insertions, 105 deletions
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagementActorLoggingTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagementActorLoggingTest.java
index 7a2266902b..b4ba6e8156 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagementActorLoggingTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagementActorLoggingTest.java
@@ -29,6 +29,8 @@ import org.apache.qpid.server.logging.AbstractTestLogging;
import org.apache.qpid.server.logging.subjects.AbstractTestLogSubject;
import javax.jms.Connection;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
import javax.management.JMException;
import javax.management.MBeanException;
import javax.management.MBeanServerConnection;
@@ -38,6 +40,8 @@ import javax.management.remote.JMXConnector;
import java.io.IOException;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
/**
* Test class to test if any change in the broker JMX code is affesting the management console
@@ -161,6 +165,23 @@ public class ManagementActorLoggingTest extends AbstractTestLogging
//Create a connection to the broker
Connection connection = getConnection();
+ // Monitor the connection for an exception being thrown
+ // this should be a DisconnectionException but it is not this tests
+ // job to valiate that. Only use the exception as a synchronisation
+ // to check the log file for the Close message
+ final CountDownLatch exceptionReceived = new CountDownLatch(1);
+ connection.setExceptionListener(new ExceptionListener()
+ {
+ public void onException(JMSException e)
+ {
+ //Failover being attempted.
+ exceptionReceived.countDown();
+ }
+ });
+
+ //Remove the connection close from any 0-10 connections
+ _monitor.reset();
+
// Get all active AMQP connections
AllObjects allObject = new AllObjects(_mbsc);
allObject.querystring = "org.apache.qpid:type=VirtualHost.Connection,*";
@@ -175,16 +196,17 @@ public class ManagementActorLoggingTest extends AbstractTestLogging
newProxyInstance(_mbsc, connectionName,
ManagedConnection.class, false);
- //Remove the connection close from any 0-10 connections
- _monitor.reset();
//Close the connection
mangedConnection.closeConnection();
+ //Wait for the connection to close
+ assertTrue("Timed out waiting for conneciton to report close",
+ exceptionReceived.await(2, TimeUnit.SECONDS));
+
//Validate results
List<String> results = _monitor.findMatches("CON-1002");
-
assertEquals("Unexpected Connection Close count", 1, results.size());
}
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/SubscriptionLoggingTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/SubscriptionLoggingTest.java
index d7209c5660..5dd56fb0f9 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/SubscriptionLoggingTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/SubscriptionLoggingTest.java
@@ -29,6 +29,7 @@ import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.Topic;
+import javax.jms.Message;
import java.io.IOException;
import java.util.List;
@@ -327,22 +328,45 @@ public class SubscriptionLoggingTest extends AbstractTestLogging
int PREFETCH = 15;
//Create new session with small prefetch
- _session = ((AMQConnection) _connection).createSession(true, Session.AUTO_ACKNOWLEDGE, PREFETCH);
+ _session = ((AMQConnection) _connection).createSession(true, Session.SESSION_TRANSACTED, PREFETCH);
MessageConsumer consumer = _session.createConsumer(_queue);
_connection.start();
+ //Start the dispatcher & Unflow the channel.
+ consumer.receiveNoWait();
+
//Fill the prefetch and two extra so that our receive bellow allows the
- // subscription to become active then return to a suspended state.
- sendMessage(_session, _queue, 17);
+ // subscription to become active
+ // Previously we set this to 17 so that it would return to a suspended
+ // state. However, testing has shown that the state change can occur
+ // sufficiently quickly that logging does not occur consistently enough
+ // for testing.
+ int SEND_COUNT = 16;
+ sendMessage(_session, _queue, SEND_COUNT);
_session.commit();
// Retreive the first message, and start the flow of messages
- assertNotNull("First message not retreived", consumer.receive(1000));
+ Message msg = consumer.receive(1000);
+ assertNotNull("First message not retreived", msg);
_session.commit();
-
- _connection.close();
+ // Drain the queue to ensure there is time for the ACTIVE log message
+ // Check that we can received all the messages
+ int receivedCount = 0;
+ while (msg != null)
+ {
+ receivedCount++;
+ msg = consumer.receive(1000);
+ _session.commit();
+ }
+
+ //Validate we received all the messages
+ assertEquals("Not all sent messages received.", SEND_COUNT, receivedCount);
+
+ // Fill the queue again to suspend the consumer
+ sendMessage(_session, _queue, SEND_COUNT);
+ _session.commit();
//Validate
List<String> results = _monitor.findMatches("SUB-1003");
@@ -350,15 +374,13 @@ public class SubscriptionLoggingTest extends AbstractTestLogging
try
{
// Validation expects three messages.
- // The first will be logged by the QueueActor as part of the processQueue thread
-// INFO - MESSAGE [vh(/test)/qu(example.queue)] [sub:6(qu(example.queue))] SUB-1003 : State : SUSPENDED
- // The second will be by the connnection as it acknowledges and activates the subscription
-// INFO - MESSAGE [con:6(guest@anonymous(26562441)/test)/ch:3] [sub:6(qu(example.queue))] SUB-1003 : State : ACTIVE
- // The final one can be the subscription suspending as part of the SubFlushRunner or the processQueue thread
- // As a result validating the actor is more complicated and doesn't add anything. The goal of this test is
- // to ensure the State is correct not that a particular Actor performs the logging.
-// INFO - MESSAGE [sub:6(vh(test)/qu(example.queue))] [sub:6(qu(example.queue))] SUB-1003 : State : SUSPENDED
-// INFO - MESSAGE [vh(/test)/qu(example.queue)] [sub:6(qu(example.queue))] SUB-1003 : State : SUSPENDED
+ // The Actor can be any one of the following depending on the exactly what is going on on the broker.
+ // Ideally we would test that we can get all of them but setting up
+ // the timing to do this in a consistent way is not benefitial.
+ // Ensuring the State is as expected is sufficient.
+// INFO - MESSAGE [vh(/test)/qu(example.queue)] [sub:6(qu(example.queue))] SUB-1003 : State :
+// INFO - MESSAGE [con:6(guest@anonymous(26562441)/test)/ch:3] [sub:6(qu(example.queue))] SUB-1003 : State :
+// INFO - MESSAGE [sub:6(vh(test)/qu(example.queue))] [sub:6(qu(example.queue))] SUB-1003 : State :
assertEquals("Result set not expected size:", 3, results.size());
@@ -367,19 +389,10 @@ public class SubscriptionLoggingTest extends AbstractTestLogging
String log = getLog(results.get(0));
validateSubscriptionState(log, expectedState);
- // Validate that the logActor is the the queue
- String actor = fromActor(log);
- assertTrue("Actor string does not contain expected queue("
- + _queue.getQueueName() + ") name." + actor,
- actor.contains("qu(" + _queue.getQueueName() + ")"));
-
// After being suspended the subscription should become active.
expectedState = "ACTIVE";
log = getLog(results.get(1));
validateSubscriptionState(log, expectedState);
- // Validate we have a connection Actor
- actor = fromActor(log);
- assertTrue("The actor is not a connection actor:" + actor, actor.startsWith("con:"));
// Validate that it was re-suspended
expectedState = "SUSPENDED";
@@ -396,6 +409,10 @@ public class SubscriptionLoggingTest extends AbstractTestLogging
}
throw afe;
}
+ _connection.close();
+
+ //Ensure the queue is drained before the test ends
+ drainQueue(_queue);
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/DynamicQueueExchangeCreateTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/DynamicQueueExchangeCreateTest.java
new file mode 100644
index 0000000000..c9810e7304
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/DynamicQueueExchangeCreateTest.java
@@ -0,0 +1,88 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.unit.client;
+
+import org.apache.qpid.test.utils.QpidTestCase;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+/**
+ * QPID-155
+ *
+ * Test to validate that setting the respective qpid.declare_queues,
+ * qpid.declare_exchanges system properties functions as expected.
+ *
+ */
+public class DynamicQueueExchangeCreateTest extends QpidTestCase
+{
+
+ public void testQueueDeclare() throws Exception
+ {
+ setSystemProperty("qpid.declare_queues", "false");
+
+ Connection connection = getConnection();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ Queue queue = session.createQueue(getTestQueueName());
+
+ try
+ {
+ session.createConsumer(queue);
+ fail("JMSException should be thrown as the queue does not exist");
+ }
+ catch (JMSException e)
+ {
+ assertTrue("Exception should be that the queue does not exist :" +
+ e.getMessage(),
+ e.getMessage().contains("does not exist"));
+
+ }
+ }
+
+ public void testExchangeDeclare() throws Exception
+ {
+ setSystemProperty("qpid.declare_exchanges", "false");
+
+ Connection connection = getConnection();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ String EXCHANGE_TYPE = "test.direct";
+ Queue queue = session.createQueue("direct://" + EXCHANGE_TYPE + "/queue/queue");
+
+ try
+ {
+ session.createConsumer(queue);
+ fail("JMSException should be thrown as the exchange does not exist");
+ }
+ catch (JMSException e)
+ {
+ assertTrue("Exception should be that the exchange does not exist :" +
+ e.getMessage(),
+ e.getMessage().contains("Exchange " + EXCHANGE_TYPE + " does not exist"));
+ }
+ }
+
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/JavaServerCloseRaceConditionTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/JavaServerCloseRaceConditionTest.java
new file mode 100644
index 0000000000..3fb6cd3526
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/JavaServerCloseRaceConditionTest.java
@@ -0,0 +1,119 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.unit.close;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ExchangeDeclareBody;
+import org.apache.qpid.framing.ExchangeDeclareOkBody;
+import org.apache.qpid.test.utils.QpidTestCase;
+
+import javax.jms.Session;
+
+/** QPID-1809
+ *
+ * Race condition on error handling and close logic.
+ *
+ * See most often with SimpleACLTest as this test is the expects the server to
+ * shut the connection/channels. This sort of testing is not performed by many,
+ * if any, of the other system tests.
+ *
+ * The problem is that we have two threads
+ *
+ * MainThread Exception(Mina)Thread
+ * | |
+ * Performs |
+ * ACtion |
+ * | Receives Server
+ * | Close
+ * Blocks for |
+ * Response |
+ * | Starts To Notify
+ * | client
+ * | |
+ * | <----- Notify Main Thread
+ * Notification |
+ * wakes client |
+ * | |
+ * Client then |
+ * processes Error. |
+ * | |
+ * Potentially Attempting Close Channel/Connection
+ * Connection Close
+ *
+ * The two threads both attempt to close the connection but the main thread does
+ * so assuming that the connection is open and valid.
+ *
+ * The Exception thread must modify the connection so that no furter syncWait
+ * commands are performed.
+ *
+ * This test sends an ExchangeDeclare that is Asynchronous and will fail and
+ * so cause a ChannelClose error but we perform a syncWait so that we can be
+ * sure to test that the BlockingWaiter is correctly awoken.
+ *
+ */
+public class JavaServerCloseRaceConditionTest extends QpidTestCase
+{
+ private static final String EXCHANGE_NAME = "NewExchangeNametoFailLookup";
+
+ public void test() throws Exception
+ {
+
+ AMQConnection connection = (AMQConnection) getConnection();
+
+ AMQSession session = (AMQSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ // Set no wait true so that we block the connection
+ // Also set a different exchange class string so the attempt to declare
+ // the exchange causes an exchange.
+ ExchangeDeclareBody body = session.getMethodRegistry().createExchangeDeclareBody(session.getTicket(), new AMQShortString(EXCHANGE_NAME), null,
+ true, false, false, false, true, null);
+
+ AMQFrame exchangeDeclare = body.generateFrame(session.getChannelId());
+
+ try
+ {
+ // block our thread so that can times out
+ connection.getProtocolHandler().syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class);
+ }
+ catch (Exception e)
+ {
+ assertTrue("Exception should say the exchange is not known.", e.getMessage().contains("Unknown exchange: " + EXCHANGE_NAME));
+ }
+
+ try
+ {
+ // Depending on if the notification thread has closed the connection
+ // or not we may get an exception here when we attempt to close the
+ // connection. If we do get one then it should be the same as above
+ // an AMQAuthenticationException.
+ connection.close();
+ }
+ catch (Exception e)
+ {
+ assertTrue("Exception should say the exchange is not known.", e.getMessage().contains("Unknown exchange: " + EXCHANGE_NAME));
+ }
+
+ }
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
index c5cdb83bbf..2a44413ac8 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
@@ -51,7 +51,13 @@ import javax.jms.TopicSubscriber;
public class DurableSubscriptionTest extends QpidTestCase
{
private static final Logger _logger = LoggerFactory.getLogger(DurableSubscriptionTest.class);
-
+
+ /** Timeout for receive() if we are expecting a message */
+ private static final long POSITIVE_RECEIVE_TIMEOUT = 2000;
+
+ /** Timeout for receive() if we are not expecting a message */
+ private static final long NEGATIVE_RECEIVE_TIMEOUT = 1000;
+
public void testUnsubscribe() throws Exception
{
AMQConnection con = (AMQConnection) getConnection("guest", "guest");
@@ -76,16 +82,18 @@ public class DurableSubscriptionTest extends QpidTestCase
Message msg;
_logger.info("Receive message on consumer 1:expecting A");
- msg = consumer1.receive();
+ msg = consumer1.receive(POSITIVE_RECEIVE_TIMEOUT);
+ assertNotNull("Message should have been received",msg);
assertEquals("A", ((TextMessage) msg).getText());
_logger.info("Receive message on consumer 1 :expecting null");
- msg = consumer1.receive(1000);
+ msg = consumer1.receive(NEGATIVE_RECEIVE_TIMEOUT);
assertEquals(null, msg);
- _logger.info("Receive message on consumer 1:expecting A");
- msg = consumer2.receive();
+ _logger.info("Receive message on consumer 2:expecting A");
+ msg = consumer2.receive(POSITIVE_RECEIVE_TIMEOUT);
+ assertNotNull("Message should have been received",msg);
assertEquals("A", ((TextMessage) msg).getText());
- msg = consumer2.receive(1000);
+ msg = consumer2.receive(NEGATIVE_RECEIVE_TIMEOUT);
_logger.info("Receive message on consumer 1 :expecting null");
assertEquals(null, msg);
@@ -96,14 +104,15 @@ public class DurableSubscriptionTest extends QpidTestCase
producer.send(session1.createTextMessage("B"));
_logger.info("Receive message on consumer 1 :expecting B");
- msg = consumer1.receive();
+ msg = consumer1.receive(POSITIVE_RECEIVE_TIMEOUT);
+ assertNotNull("Message should have been received",msg);
assertEquals("B", ((TextMessage) msg).getText());
_logger.info("Receive message on consumer 1 :expecting null");
- msg = consumer1.receive(1000);
+ msg = consumer1.receive(NEGATIVE_RECEIVE_TIMEOUT);
assertEquals(null, msg);
_logger.info("Receive message on consumer 2 :expecting null");
- msg = consumer2.receive(1000);
+ msg = consumer2.receive(NEGATIVE_RECEIVE_TIMEOUT);
assertEquals(null, msg);
_logger.info("Close connection");
@@ -143,14 +152,16 @@ public class DurableSubscriptionTest extends QpidTestCase
producer.send(session1.createTextMessage("A"));
Message msg;
- msg = consumer1.receive();
+ msg = consumer1.receive(POSITIVE_RECEIVE_TIMEOUT);
+ assertNotNull("Message should have been received",msg);
assertEquals("A", ((TextMessage) msg).getText());
- msg = consumer1.receive(1000);
+ msg = consumer1.receive(NEGATIVE_RECEIVE_TIMEOUT);
assertEquals(null, msg);
- msg = consumer2.receive();
+ msg = consumer2.receive(POSITIVE_RECEIVE_TIMEOUT);
+ assertNotNull("Message should have been received",msg);
assertEquals("A", ((TextMessage) msg).getText());
- msg = consumer2.receive(1000);
+ msg = consumer2.receive(NEGATIVE_RECEIVE_TIMEOUT);
assertEquals(null, msg);
consumer2.close();
@@ -220,8 +231,8 @@ public class DurableSubscriptionTest extends QpidTestCase
msg = consumer1.receive(500);
assertNull("There should be no more messages for consumption on consumer1.", msg);
- msg = consumer2.receive();
- assertNotNull(msg);
+ msg = consumer2.receive(POSITIVE_RECEIVE_TIMEOUT);
+ assertNotNull("Message should have been received",msg);
assertEquals("Consumer 2 should also received the first msg.", "A", ((TextMessage) msg).getText());
msg = consumer2.receive(500);
assertNull("There should be no more messages for consumption on consumer2.", msg);
@@ -235,10 +246,10 @@ public class DurableSubscriptionTest extends QpidTestCase
producer.send(session0.createTextMessage("B"));
_logger.info("Receive message on consumer 1 :expecting B");
- msg = consumer1.receive(1000);
+ msg = consumer1.receive(NEGATIVE_RECEIVE_TIMEOUT);
assertEquals("B", ((TextMessage) msg).getText());
_logger.info("Receive message on consumer 1 :expecting null");
- msg = consumer1.receive(1000);
+ msg = consumer1.receive(NEGATIVE_RECEIVE_TIMEOUT);
assertEquals(null, msg);
// Re-attach a new consumer to the durable subscription, and check that it gets the message that it missed.
@@ -296,7 +307,7 @@ public class DurableSubscriptionTest extends QpidTestCase
producer.send(session.createTextMessage("testDurableWithInvalidSelector2"));
- Message msg = liveSubscriber.receive();
+ Message msg = liveSubscriber.receive(POSITIVE_RECEIVE_TIMEOUT);
assertNotNull ("Message should have been received", msg);
assertEquals ("testDurableWithInvalidSelector2", ((TextMessage) msg).getText());
assertNull("Should not receive subsequent message", liveSubscriber.receive(200));
@@ -331,7 +342,7 @@ public class DurableSubscriptionTest extends QpidTestCase
assertNotNull("Subscriber should have been created", liveSubscriber);
producer.send(session.createTextMessage("testDurableWithInvalidSelector2"));
- Message msg = liveSubscriber.receive();
+ Message msg = liveSubscriber.receive(POSITIVE_RECEIVE_TIMEOUT);
assertNotNull ("Message should have been received", msg);
assertEquals ("testDurableWithInvalidSelector2", ((TextMessage) msg).getText());
assertNull("Should not receive subsequent message", liveSubscriber.receive(200));
@@ -360,13 +371,13 @@ public class DurableSubscriptionTest extends QpidTestCase
// Send 1 matching message and 1 non-matching message
sendMatchingAndNonMatchingMessage(session, producer);
- Message rMsg = subA.receive(1000);
+ Message rMsg = subA.receive(NEGATIVE_RECEIVE_TIMEOUT);
assertNotNull(rMsg);
assertEquals("Content was wrong",
"testResubscribeWithChangedSelector1",
((TextMessage) rMsg).getText());
- rMsg = subA.receive(1000);
+ rMsg = subA.receive(NEGATIVE_RECEIVE_TIMEOUT);
assertNull(rMsg);
// Disconnect subscriber
@@ -379,13 +390,13 @@ public class DurableSubscriptionTest extends QpidTestCase
// Check messages are recieved properly
sendMatchingAndNonMatchingMessage(session, producer);
- rMsg = subB.receive(1000);
+ rMsg = subB.receive(NEGATIVE_RECEIVE_TIMEOUT);
assertNotNull(rMsg);
assertEquals("Content was wrong",
"testResubscribeWithChangedSelector2",
((TextMessage) rMsg).getText());
- rMsg = subB.receive(1000);
+ rMsg = subB.receive(NEGATIVE_RECEIVE_TIMEOUT);
assertNull(rMsg);
session.unsubscribe("testResubscribeWithChangedSelector");
}
@@ -429,5 +440,5 @@ public class DurableSubscriptionTest extends QpidTestCase
public static junit.framework.Test suite()
{
return new junit.framework.TestSuite(DurableSubscriptionTest.class);
- }
+ }
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
index 9c755fcb41..b603455644 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
@@ -479,7 +479,7 @@ public class CommitRollbackTest extends QpidTestCase
_publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT));
_pubSession.commit();
- assertNotNull(_consumer.receive(100));
+ assertNotNull(_consumer.receive(1000));
_publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT));
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java
index cc9cfce34b..1bef07fcd5 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java
@@ -55,7 +55,7 @@ public class FailoverBaseCase extends QpidTestCase
{
super.setUp();
setSystemProperty("QPID_WORK", System.getProperty("java.io.tmpdir")+"/"+getFailingPort());
- startBroker(FAILING_PORT);
+ startBroker(failingPort);
}
/**
@@ -76,7 +76,7 @@ public class FailoverBaseCase extends QpidTestCase
public void tearDown() throws Exception
{
- stopBroker(FAILING_PORT);
+ stopBroker(_broker.equals(VM)?FAILING_PORT:FAILING_PORT);
super.tearDown();
FileUtils.deleteDirectory(System.getProperty("java.io.tmpdir")+"/"+getFailingPort());
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/util/LogMonitor.java b/qpid/java/systests/src/main/java/org/apache/qpid/util/LogMonitor.java
index df8dd0b85b..44ac5b4838 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/util/LogMonitor.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/util/LogMonitor.java
@@ -31,6 +31,7 @@ import java.io.FileOutputStream;
import java.io.FileReader;
import java.io.IOException;
import java.io.OutputStreamWriter;
+import java.util.ArrayList;
import java.util.List;
/**
@@ -118,7 +119,7 @@ public class LogMonitor
* @throws java.io.FileNotFoundException if the Log file can nolonger be found
* @throws IOException thrown when reading the log file
*/
- public boolean waitForMessage(String message, long wait)
+ public boolean waitForMessage(String message, long wait, boolean printFileOnFailure)
throws FileNotFoundException, IOException
{
// Loop through alerts until we're done or wait ms seconds have passed,
@@ -126,20 +127,35 @@ public class LogMonitor
BufferedReader reader = new BufferedReader(new FileReader(_logfile));
boolean found = false;
long endtime = System.currentTimeMillis() + wait;
+ ArrayList<String> contents = new ArrayList<String>();
while (!found && System.currentTimeMillis() < endtime)
{
while (reader.ready())
{
String line = reader.readLine();
+ contents.add(line);
if (line.contains(message))
{
found = true;
}
}
}
-
+ if (!found && printFileOnFailure)
+ {
+ for (String line : contents)
+ {
+ System.out.println(line);
+ }
+ }
return found;
}
+
+
+ public boolean waitForMessage(String messageCountAlert, long alertLogWaitPeriod) throws FileNotFoundException, IOException
+ {
+ return waitForMessage(messageCountAlert, alertLogWaitPeriod, true);
+ }
+
/**
* Read the log file in to memory as a String
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/util/LogMonitorTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/util/LogMonitorTest.java
index d1a0df30a9..2b9fe8e039 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/util/LogMonitorTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/util/LogMonitorTest.java
@@ -30,25 +30,25 @@ import java.util.List;
public class LogMonitorTest extends TestCase
{
+ private LogMonitor _monitor;
+
+ @Override
+ public void setUp() throws Exception
+ {
+ _monitor = new LogMonitor();
+ _monitor.getMonitoredFile().deleteOnExit(); // Make sure we clean up
+ }
+
/**
* Test that a new file is created when attempting to set up a monitor with
* the default constructor.
*/
public void testMonitor()
{
- // Validate that a NPE is thrown with null input
- try
- {
- LogMonitor montior = new LogMonitor();
- //Validte that the monitor is now running on a new file
- assertTrue("New file does not have correct name:" + montior.
- getMonitoredFile().getName(),
- montior.getMonitoredFile().getName().contains("LogMonitor"));
- }
- catch (IOException ioe)
- {
- fail("IOE thrown:" + ioe);
- }
+ //Validate that the monitor is now running on a new file
+ assertTrue("New file does not have correct name:" + _monitor.
+ getMonitoredFile().getName(),
+ _monitor.getMonitoredFile().getName().contains("LogMonitor"));
}
/**
@@ -63,13 +63,11 @@ public class LogMonitorTest extends TestCase
File testFile = File.createTempFile("testMonitorFile", ".log");
testFile.deleteOnExit();
- LogMonitor monitor;
-
//Ensure that we can create a monitor on a file
try
{
- monitor = new LogMonitor(testFile);
- assertEquals(testFile, monitor.getMonitoredFile());
+ _monitor = new LogMonitor(testFile);
+ assertEquals(testFile, _monitor.getMonitoredFile());
}
catch (IOException ioe)
{
@@ -136,13 +134,12 @@ public class LogMonitorTest extends TestCase
*/
public void testFindMatches_Match() throws IOException
{
- LogMonitor monitor = new LogMonitor();
String message = getName() + ": Test Message";
Logger.getRootLogger().warn(message);
- validateLogContainsMessage(monitor, message);
+ validateLogContainsMessage(_monitor, message);
}
/**
@@ -152,35 +149,17 @@ public class LogMonitorTest extends TestCase
*/
public void testFindMatches_NoMatch() throws IOException
{
- LogMonitor monitor = new LogMonitor();
-
String message = getName() + ": Test Message";
Logger.getRootLogger().warn(message);
String notLogged = "This text was not logged";
- validateLogDoesNotContainsMessage(monitor, notLogged);
- }
-
- public void testWaitForMessage_Found() throws IOException
- {
- LogMonitor monitor = new LogMonitor();
-
- String message = getName() + ": Test Message";
-
- long TIME_OUT = 2000;
-
- logMessageWithDelay(message, TIME_OUT / 2);
-
- assertTrue("Message was not logged ",
- monitor.waitForMessage(message, TIME_OUT));
+ validateLogDoesNotContainsMessage(_monitor, notLogged);
}
public void testWaitForMessage_Timeout() throws IOException
{
- LogMonitor monitor = new LogMonitor();
-
String message = getName() + ": Test Message";
long TIME_OUT = 2000;
@@ -189,41 +168,37 @@ public class LogMonitorTest extends TestCase
// Verify that we can time out waiting for a message
assertFalse("Message was logged ",
- monitor.waitForMessage(message, TIME_OUT / 2));
+ _monitor.waitForMessage(message, TIME_OUT / 2, false));
// Verify that the message did eventually get logged.
assertTrue("Message was never logged.",
- monitor.waitForMessage(message, TIME_OUT));
+ _monitor.waitForMessage(message, TIME_OUT));
}
public void testReset() throws IOException
{
- LogMonitor monitor = new LogMonitor();
-
String message = getName() + ": Test Message";
Logger.getRootLogger().warn(message);
- validateLogContainsMessage(monitor, message);
+ validateLogContainsMessage(_monitor, message);
String LOG_RESET_TEXT = "Log Monitor Reset";
- validateLogDoesNotContainsMessage(monitor, LOG_RESET_TEXT);
+ validateLogDoesNotContainsMessage(_monitor, LOG_RESET_TEXT);
- monitor.reset();
+ _monitor.reset();
- assertEquals("", monitor.readFile());
+ assertEquals("", _monitor.readFile());
}
public void testRead() throws IOException
{
- LogMonitor monitor = new LogMonitor();
-
String message = getName() + ": Test Message";
Logger.getRootLogger().warn(message);
- String fileContents = monitor.readFile();
+ String fileContents = _monitor.readFile();
assertTrue("Logged message not found when reading file.",
fileContents.contains(message));