summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAidan Skinner <aidan@apache.org>2009-08-17 15:54:53 +0000
committerAidan Skinner <aidan@apache.org>2009-08-17 15:54:53 +0000
commit0c50acc795dbb205c5ed30977c2eb1e7f089c19b (patch)
treebe492e008e4aeac89d997021c36edfd21f673d32
parent45dec2e0b9238d1a870665abec3df9abbb2aad22 (diff)
downloadqpid-python-0c50acc795dbb205c5ed30977c2eb1e7f089c19b.tar.gz
QPID-1911, QPID-1912, QPID-1913: make SelectorTest, TopicSessionTest, SelectorTest and SubscriptionLoggingTest all use transactions to stop intermittent timing related test failures.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@805021 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/logging/SubscriptionLoggingTest.java17
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/SelectorTest.java10
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/forwardall/Client.java12
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/forwardall/Service.java3
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java75
5 files changed, 79 insertions, 38 deletions
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/SubscriptionLoggingTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/SubscriptionLoggingTest.java
index d97ed71607..d7209c5660 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/SubscriptionLoggingTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/SubscriptionLoggingTest.java
@@ -327,7 +327,7 @@ public class SubscriptionLoggingTest extends AbstractTestLogging
int PREFETCH = 15;
//Create new session with small prefetch
- _session = ((AMQConnection) _connection).createSession(false, Session.AUTO_ACKNOWLEDGE, PREFETCH);
+ _session = ((AMQConnection) _connection).createSession(true, Session.AUTO_ACKNOWLEDGE, PREFETCH);
MessageConsumer consumer = _session.createConsumer(_queue);
@@ -336,16 +336,11 @@ public class SubscriptionLoggingTest extends AbstractTestLogging
//Fill the prefetch and two extra so that our receive bellow allows the
// subscription to become active then return to a suspended state.
sendMessage(_session, _queue, 17);
-
+ _session.commit();
// Retreive the first message, and start the flow of messages
assertNotNull("First message not retreived", consumer.receive(1000));
-
- //Give the internal broker time to respond to the ack that the above
- // receive will perform.
- if (!isExternalBroker())
- {
- Thread.sleep(1000);
- }
+ _session.commit();
+
_connection.close();
@@ -356,7 +351,7 @@ public class SubscriptionLoggingTest extends AbstractTestLogging
{
// Validation expects three messages.
// The first will be logged by the QueueActor as part of the processQueue thread
-// INFO - MESSAGE [vh(/test)/qu(example.queue)] [sub:6(qu(example.queue))] SUB-1003 : State : SUSPENDED
+// INFO - MESSAGE [vh(/test)/qu(example.queue)] [sub:6(qu(example.queue))] SUB-1003 : State : SUSPENDED
// The second will be by the connnection as it acknowledges and activates the subscription
// INFO - MESSAGE [con:6(guest@anonymous(26562441)/test)/ch:3] [sub:6(qu(example.queue))] SUB-1003 : State : ACTIVE
// The final one can be the subscription suspending as part of the SubFlushRunner or the processQueue thread
@@ -365,7 +360,7 @@ public class SubscriptionLoggingTest extends AbstractTestLogging
// INFO - MESSAGE [sub:6(vh(test)/qu(example.queue))] [sub:6(qu(example.queue))] SUB-1003 : State : SUSPENDED
// INFO - MESSAGE [vh(/test)/qu(example.queue)] [sub:6(qu(example.queue))] SUB-1003 : State : SUSPENDED
- assertEquals("Result set larger than expected.", 3, results.size());
+ assertEquals("Result set not expected size:", 3, results.size());
// Validate Initial Suspension
String expectedState = "SUSPENDED";
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/SelectorTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/SelectorTest.java
index af11a94ca3..5a5e23baa5 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/SelectorTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/SelectorTest.java
@@ -20,7 +20,7 @@ public class SelectorTest extends QpidTestCase
{
Connection conn = getConnection();
conn.start();
- Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Session session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
Destination dest = session.createQueue("SelectorQueue");
@@ -32,6 +32,7 @@ public class SelectorTest extends QpidTestCase
Message msg = session.createTextMessage("Msg" + String.valueOf(i));
prod.send(msg);
}
+ session.commit();
Message msg1 = consumer.receive(1000);
Message msg2 = consumer.receive(1000);
@@ -39,6 +40,8 @@ public class SelectorTest extends QpidTestCase
Assert.assertNotNull("Msg1 should not be null", msg1);
Assert.assertNotNull("Msg2 should not be null", msg2);
+ session.commit();
+
prod.setDisableMessageID(true);
for (int i=0; i<2; i++)
@@ -47,14 +50,15 @@ public class SelectorTest extends QpidTestCase
prod.send(msg);
}
+ session.commit();
Message msg3 = consumer.receive(1000);
Assert.assertNull("Msg3 should be null", msg3);
-
+ session.commit();
consumer = session.createConsumer(dest,"JMSMessageID IS NULL");
Message msg4 = consumer.receive(1000);
Message msg5 = consumer.receive(1000);
-
+ session.commit();
Assert.assertNotNull("Msg4 should not be null", msg4);
Assert.assertNotNull("Msg5 should not be null", msg5);
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/forwardall/Client.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/forwardall/Client.java
index 0be11011b4..d911bb33d7 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/forwardall/Client.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/forwardall/Client.java
@@ -29,6 +29,7 @@ import org.apache.qpid.test.utils.QpidTestCase;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
@@ -62,7 +63,7 @@ public class Client implements MessageListener
{
_connection = connection;
_expected = expected;
- _session = (AMQSession) _connection.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+ _session = (AMQSession) _connection.createSession(true, AMQSession.NO_ACKNOWLEDGE);
AMQQueue response =
new AMQQueue(_connection.getDefaultQueueExchangeName(), new AMQShortString("ResponseQueue"), true);
_session.createConsumer(response).setMessageListener(this);
@@ -73,6 +74,7 @@ public class Client implements MessageListener
request.setJMSReplyTo(response);
MessageProducer prod = _session.createProducer(service);
prod.send(request);
+ _session.commit();
}
void shutdownWhenComplete() throws Exception
@@ -90,6 +92,14 @@ public class Client implements MessageListener
notifyAll();
}
+ try
+ {
+ _session.commit();
+ }
+ catch (JMSException e)
+ {
+
+ }
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/forwardall/Service.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/forwardall/Service.java
index 9cd8b183af..ce50ceae19 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/forwardall/Service.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/forwardall/Service.java
@@ -55,7 +55,7 @@ public class Service implements MessageListener
{
_connection = connection;
//AMQQueue queue = new SpecialQueue(connection, "ServiceQueue");
- _session = (AMQSession) _connection.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+ _session = (AMQSession) _connection.createSession(true, AMQSession.NO_ACKNOWLEDGE);
AMQQueue queue = (AMQQueue) _session.createQueue("ServiceQueue") ;
_session.createConsumer(queue).setMessageListener(this);
_connection.start();
@@ -68,6 +68,7 @@ public class Service implements MessageListener
Message response = _session.createTextMessage("Response!");
Destination replyTo = request.getJMSReplyTo();
_session.createProducer(replyTo).send(response);
+ _session.commit();
}
catch (Exception e)
{
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java
index 95b90481c7..f8ba7060a9 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java
@@ -57,7 +57,7 @@ public class TopicSessionTest extends QpidTestCase
AMQConnection con = (AMQConnection) getConnection("guest", "guest");
AMQTopic topic = new AMQTopic(con.getDefaultTopicExchangeName(), "MyTopic");
- TopicSession session1 = con.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE);
+ TopicSession session1 = con.createTopicSession(true, AMQSession.NO_ACKNOWLEDGE);
TopicSubscriber sub = session1.createDurableSubscriber(topic, "subscription0");
TopicPublisher publisher = session1.createPublisher(topic);
@@ -65,10 +65,11 @@ public class TopicSessionTest extends QpidTestCase
TextMessage tm = session1.createTextMessage("Hello");
publisher.publish(tm);
+ session1.commit();
tm = (TextMessage) sub.receive(2000);
assertNotNull(tm);
-
+ session1.commit();
session1.unsubscribe("subscription0");
try
@@ -104,15 +105,17 @@ public class TopicSessionTest extends QpidTestCase
AMQTopic topic = new AMQTopic(con, "MyTopic1" + String.valueOf(shutdown));
AMQTopic topic2 = new AMQTopic(con, "MyOtherTopic1" + String.valueOf(shutdown));
- TopicSession session1 = con.createTopicSession(false, AMQSession.AUTO_ACKNOWLEDGE);
+ TopicSession session1 = con.createTopicSession(true, AMQSession.AUTO_ACKNOWLEDGE);
TopicSubscriber sub = session1.createDurableSubscriber(topic, "subscription0");
TopicPublisher publisher = session1.createPublisher(null);
con.start();
publisher.publish(topic, session1.createTextMessage("hello"));
+ session1.commit();
TextMessage m = (TextMessage) sub.receive(2000);
assertNotNull(m);
+ session1.commit();
if (shutdown)
{
@@ -120,17 +123,20 @@ public class TopicSessionTest extends QpidTestCase
con.close();
con = (AMQConnection) getConnection("guest", "guest");
con.start();
- session1 = con.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE);
+ session1 = con.createTopicSession(true, AMQSession.NO_ACKNOWLEDGE);
publisher = session1.createPublisher(null);
}
TopicSubscriber sub2 = session1.createDurableSubscriber(topic2, "subscription0");
publisher.publish(topic, session1.createTextMessage("hello"));
+ session1.commit();
if (!shutdown)
{
m = (TextMessage) sub.receive(2000);
assertNull(m);
+ session1.commit();
}
publisher.publish(topic2, session1.createTextMessage("goodbye"));
+ session1.commit();
m = (TextMessage) sub2.receive(2000);
assertNotNull(m);
assertEquals("goodbye", m.getText());
@@ -143,25 +149,29 @@ public class TopicSessionTest extends QpidTestCase
AMQConnection con1 = (AMQConnection) getConnection("guest", "guest", "clientid");
AMQTopic topic = new AMQTopic(con1, "MyTopic3");
- TopicSession session1 = con1.createTopicSession(false, AMQSession.AUTO_ACKNOWLEDGE);
+ TopicSession session1 = con1.createTopicSession(true, AMQSession.AUTO_ACKNOWLEDGE);
TopicPublisher publisher = session1.createPublisher(topic);
AMQConnection con2 = (AMQConnection) getConnection("guest", "guest", "clientid");
- TopicSession session2 = con2.createTopicSession(false, AMQSession.AUTO_ACKNOWLEDGE);
+ TopicSession session2 = con2.createTopicSession(true, AMQSession.AUTO_ACKNOWLEDGE);
TopicSubscriber sub = session2.createDurableSubscriber(topic, "subscription0");
con2.start();
publisher.publish(session1.createTextMessage("Hello"));
+ session1.commit();
TextMessage tm = (TextMessage) sub.receive(2000);
+ session2.commit();
assertNotNull(tm);
con2.close();
publisher.publish(session1.createTextMessage("Hello2"));
+ session1.commit();
con2 = (AMQConnection) getConnection("guest", "guest", "clientid");
- session2 = con2.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE);
+ session2 = con2.createTopicSession(true, AMQSession.NO_ACKNOWLEDGE);
sub = session2.createDurableSubscriber(topic, "subscription0");
con2.start();
tm = (TextMessage) sub.receive(2000);
+ session2.commit();
assertNotNull(tm);
assertEquals("Hello2", tm.getText());
session2.unsubscribe("subscription0");
@@ -174,12 +184,13 @@ public class TopicSessionTest extends QpidTestCase
AMQConnection con = (AMQConnection) getConnection("guest", "guest");
AMQTopic topic = new AMQTopic(con, "MyTopic4");
- TopicSession session1 = con.createTopicSession(false, AMQSession.AUTO_ACKNOWLEDGE);
+ TopicSession session1 = con.createTopicSession(true, AMQSession.AUTO_ACKNOWLEDGE);
TopicPublisher publisher = session1.createPublisher(topic);
MessageConsumer consumer1 = session1.createConsumer(topic);
con.start();
TextMessage tm = session1.createTextMessage("Hello");
publisher.publish(tm);
+ session1.commit();
tm = (TextMessage) consumer1.receive(10000L);
assertNotNull(tm);
String msgText = tm.getText();
@@ -188,15 +199,19 @@ public class TopicSessionTest extends QpidTestCase
msgText = tm.getText();
assertNull(msgText);
publisher.publish(tm);
+ session1.commit();
tm = (TextMessage) consumer1.receive(10000L);
assertNotNull(tm);
+ session1.commit();
msgText = tm.getText();
assertNull(msgText);
tm.clearBody();
tm.setText("Now we are not null");
publisher.publish(tm);
+ session1.commit();
tm = (TextMessage) consumer1.receive(2000);
assertNotNull(tm);
+ session1.commit();
msgText = tm.getText();
assertEquals("Now we are not null", msgText);
@@ -204,7 +219,9 @@ public class TopicSessionTest extends QpidTestCase
msgText = tm.getText();
assertEquals("Empty string not returned", "", msgText);
publisher.publish(tm);
+ session1.commit();
tm = (TextMessage) consumer1.receive(2000);
+ session1.commit();
assertNotNull(tm);
assertEquals("Empty string not returned", "", msgText);
con.close();
@@ -213,7 +230,7 @@ public class TopicSessionTest extends QpidTestCase
public void testSendingSameMessage() throws Exception
{
AMQConnection conn = (AMQConnection) getConnection("guest", "guest");
- TopicSession session = conn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+ TopicSession session = conn.createTopicSession(true, Session.AUTO_ACKNOWLEDGE);
TemporaryTopic topic = session.createTemporaryTopic();
assertNotNull(topic);
TopicPublisher producer = session.createPublisher(topic);
@@ -221,14 +238,16 @@ public class TopicSessionTest extends QpidTestCase
conn.start();
TextMessage sentMessage = session.createTextMessage("Test Message");
producer.send(sentMessage);
+ session.commit();
TextMessage receivedMessage = (TextMessage) consumer.receive(2000);
assertNotNull(receivedMessage);
assertEquals(sentMessage.getText(), receivedMessage.getText());
producer.send(sentMessage);
+ session.commit();
receivedMessage = (TextMessage) consumer.receive(2000);
assertNotNull(receivedMessage);
assertEquals(sentMessage.getText(), receivedMessage.getText());
-
+ session.commit();
conn.close();
}
@@ -236,17 +255,18 @@ public class TopicSessionTest extends QpidTestCase
public void testTemporaryTopic() throws Exception
{
AMQConnection conn = (AMQConnection) getConnection("guest", "guest");
- TopicSession session = conn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+ TopicSession session = conn.createTopicSession(true, Session.AUTO_ACKNOWLEDGE);
TemporaryTopic topic = session.createTemporaryTopic();
assertNotNull(topic);
TopicPublisher producer = session.createPublisher(topic);
MessageConsumer consumer = session.createConsumer(topic);
conn.start();
producer.send(session.createTextMessage("hello"));
+ session.commit();
TextMessage tm = (TextMessage) consumer.receive(2000);
assertNotNull(tm);
assertEquals("hello", tm.getText());
-
+ session.commit();
try
{
topic.delete();
@@ -291,7 +311,7 @@ public class TopicSessionTest extends QpidTestCase
AMQTopic topic = new AMQTopic(con, "testNoLocal");
- TopicSession session1 = con.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE);
+ TopicSession session1 = con.createTopicSession(true, AMQSession.NO_ACKNOWLEDGE);
TopicSubscriber noLocal = session1.createSubscriber(topic, "", true);
TopicSubscriber select = session1.createSubscriber(topic, "Selector = 'select'", false);
TopicSubscriber normal = session1.createSubscriber(topic);
@@ -304,15 +324,17 @@ public class TopicSessionTest extends QpidTestCase
//send message to all consumers
publisher.publish(session1.createTextMessage("hello-new2"));
-
+ session1.commit();
//test normal subscriber gets message
m = (TextMessage) normal.receive(1000);
assertNotNull(m);
-
+ session1.commit();
+
//test selector subscriber doesn't message
m = (TextMessage) select.receive(1000);
assertNull(m);
-
+ session1.commit();
+
//test nolocal subscriber doesn't message
m = (TextMessage) noLocal.receive(1000);
if (m != null)
@@ -326,21 +348,24 @@ public class TopicSessionTest extends QpidTestCase
message.setStringProperty("Selector", "select");
publisher.publish(message);
-
+ session1.commit();
+
//test normal subscriber gets message
m = (TextMessage) normal.receive(1000);
assertNotNull(m);
-
+ session1.commit();
+
//test selector subscriber does get message
m = (TextMessage) select.receive(1000);
assertNotNull(m);
+ session1.commit();
//test nolocal subscriber doesn't message
m = (TextMessage) noLocal.receive(100);
assertNull(m);
AMQConnection con2 = (AMQConnection) getConnection("guest", "guest", "foo");
- TopicSession session2 = con2.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE);
+ TopicSession session2 = con2.createTopicSession(true, AMQSession.NO_ACKNOWLEDGE);
TopicPublisher publisher2 = session2.createPublisher(topic);
@@ -348,14 +373,17 @@ public class TopicSessionTest extends QpidTestCase
message.setStringProperty("Selector", "select");
publisher2.publish(message);
+ session2.commit();
//test normal subscriber gets message
m = (TextMessage) normal.receive(1000);
assertNotNull(m);
+ session1.commit();
//test selector subscriber does get message
m = (TextMessage) select.receive(1000);
assertNotNull(m);
+ session1.commit();
//test nolocal subscriber does message
m = (TextMessage) noLocal.receive(100);
@@ -378,7 +406,7 @@ public class TopicSessionTest extends QpidTestCase
// Setup Topic
AMQTopic topic = new AMQTopic(con, "testNoLocal");
- TopicSession session = con.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE);
+ TopicSession session = con.createTopicSession(true, AMQSession.NO_ACKNOWLEDGE);
// Setup subscriber with selector
TopicSubscriber selector = session.createSubscriber(topic, "Selector = 'select'", false);
@@ -391,13 +419,15 @@ public class TopicSessionTest extends QpidTestCase
// Send non-matching message
message = session.createTextMessage("non-matching 1");
publisher.publish(message);
+ session.commit();
// Send and consume matching message
message = session.createTextMessage("hello");
message.setStringProperty("Selector", "select");
publisher.publish(message);
-
+ session.commit();
+
m = (TextMessage) selector.receive(1000);
assertNotNull("should have received message", m);
assertEquals("Message contents were wrong", "hello", m.getText());
@@ -405,7 +435,8 @@ public class TopicSessionTest extends QpidTestCase
// Send non-matching message
message = session.createTextMessage("non-matching 2");
publisher.publish(message);
-
+ session.commit();
+
// Assert queue count is 0
long depth = ((AMQTopicSessionAdaptor) session).getSession().getQueueDepth(topic);
assertEquals("Queue depth was wrong", 0, depth);