summaryrefslogtreecommitdiff
path: root/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/SubscriptionLoggingTest.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/systests/src/main/java/org/apache/qpid/server/logging/SubscriptionLoggingTest.java')
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/logging/SubscriptionLoggingTest.java65
1 files changed, 41 insertions, 24 deletions
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/SubscriptionLoggingTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/SubscriptionLoggingTest.java
index d7209c5660..5dd56fb0f9 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/SubscriptionLoggingTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/SubscriptionLoggingTest.java
@@ -29,6 +29,7 @@ import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.Topic;
+import javax.jms.Message;
import java.io.IOException;
import java.util.List;
@@ -327,22 +328,45 @@ public class SubscriptionLoggingTest extends AbstractTestLogging
int PREFETCH = 15;
//Create new session with small prefetch
- _session = ((AMQConnection) _connection).createSession(true, Session.AUTO_ACKNOWLEDGE, PREFETCH);
+ _session = ((AMQConnection) _connection).createSession(true, Session.SESSION_TRANSACTED, PREFETCH);
MessageConsumer consumer = _session.createConsumer(_queue);
_connection.start();
+ //Start the dispatcher & Unflow the channel.
+ consumer.receiveNoWait();
+
//Fill the prefetch and two extra so that our receive bellow allows the
- // subscription to become active then return to a suspended state.
- sendMessage(_session, _queue, 17);
+ // subscription to become active
+ // Previously we set this to 17 so that it would return to a suspended
+ // state. However, testing has shown that the state change can occur
+ // sufficiently quickly that logging does not occur consistently enough
+ // for testing.
+ int SEND_COUNT = 16;
+ sendMessage(_session, _queue, SEND_COUNT);
_session.commit();
// Retreive the first message, and start the flow of messages
- assertNotNull("First message not retreived", consumer.receive(1000));
+ Message msg = consumer.receive(1000);
+ assertNotNull("First message not retreived", msg);
_session.commit();
-
- _connection.close();
+ // Drain the queue to ensure there is time for the ACTIVE log message
+ // Check that we can received all the messages
+ int receivedCount = 0;
+ while (msg != null)
+ {
+ receivedCount++;
+ msg = consumer.receive(1000);
+ _session.commit();
+ }
+
+ //Validate we received all the messages
+ assertEquals("Not all sent messages received.", SEND_COUNT, receivedCount);
+
+ // Fill the queue again to suspend the consumer
+ sendMessage(_session, _queue, SEND_COUNT);
+ _session.commit();
//Validate
List<String> results = _monitor.findMatches("SUB-1003");
@@ -350,15 +374,13 @@ public class SubscriptionLoggingTest extends AbstractTestLogging
try
{
// Validation expects three messages.
- // The first will be logged by the QueueActor as part of the processQueue thread
-// INFO - MESSAGE [vh(/test)/qu(example.queue)] [sub:6(qu(example.queue))] SUB-1003 : State : SUSPENDED
- // The second will be by the connnection as it acknowledges and activates the subscription
-// INFO - MESSAGE [con:6(guest@anonymous(26562441)/test)/ch:3] [sub:6(qu(example.queue))] SUB-1003 : State : ACTIVE
- // The final one can be the subscription suspending as part of the SubFlushRunner or the processQueue thread
- // As a result validating the actor is more complicated and doesn't add anything. The goal of this test is
- // to ensure the State is correct not that a particular Actor performs the logging.
-// INFO - MESSAGE [sub:6(vh(test)/qu(example.queue))] [sub:6(qu(example.queue))] SUB-1003 : State : SUSPENDED
-// INFO - MESSAGE [vh(/test)/qu(example.queue)] [sub:6(qu(example.queue))] SUB-1003 : State : SUSPENDED
+ // The Actor can be any one of the following depending on the exactly what is going on on the broker.
+ // Ideally we would test that we can get all of them but setting up
+ // the timing to do this in a consistent way is not benefitial.
+ // Ensuring the State is as expected is sufficient.
+// INFO - MESSAGE [vh(/test)/qu(example.queue)] [sub:6(qu(example.queue))] SUB-1003 : State :
+// INFO - MESSAGE [con:6(guest@anonymous(26562441)/test)/ch:3] [sub:6(qu(example.queue))] SUB-1003 : State :
+// INFO - MESSAGE [sub:6(vh(test)/qu(example.queue))] [sub:6(qu(example.queue))] SUB-1003 : State :
assertEquals("Result set not expected size:", 3, results.size());
@@ -367,19 +389,10 @@ public class SubscriptionLoggingTest extends AbstractTestLogging
String log = getLog(results.get(0));
validateSubscriptionState(log, expectedState);
- // Validate that the logActor is the the queue
- String actor = fromActor(log);
- assertTrue("Actor string does not contain expected queue("
- + _queue.getQueueName() + ") name." + actor,
- actor.contains("qu(" + _queue.getQueueName() + ")"));
-
// After being suspended the subscription should become active.
expectedState = "ACTIVE";
log = getLog(results.get(1));
validateSubscriptionState(log, expectedState);
- // Validate we have a connection Actor
- actor = fromActor(log);
- assertTrue("The actor is not a connection actor:" + actor, actor.startsWith("con:"));
// Validate that it was re-suspended
expectedState = "SUSPENDED";
@@ -396,6 +409,10 @@ public class SubscriptionLoggingTest extends AbstractTestLogging
}
throw afe;
}
+ _connection.close();
+
+ //Ensure the queue is drained before the test ends
+ drainQueue(_queue);
}