diff options
author | Aidan Skinner <aidan@apache.org> | 2009-08-17 15:54:53 +0000 |
---|---|---|
committer | Aidan Skinner <aidan@apache.org> | 2009-08-17 15:54:53 +0000 |
commit | 0c50acc795dbb205c5ed30977c2eb1e7f089c19b (patch) | |
tree | be492e008e4aeac89d997021c36edfd21f673d32 | |
parent | 45dec2e0b9238d1a870665abec3df9abbb2aad22 (diff) | |
download | qpid-python-0c50acc795dbb205c5ed30977c2eb1e7f089c19b.tar.gz |
QPID-1911, QPID-1912, QPID-1913: make SelectorTest, TopicSessionTest, SelectorTest and SubscriptionLoggingTest all use transactions to stop intermittent timing related test failures.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@805021 13f79535-47bb-0310-9956-ffa450edef68
5 files changed, 79 insertions, 38 deletions
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/SubscriptionLoggingTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/SubscriptionLoggingTest.java index d97ed71607..d7209c5660 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/SubscriptionLoggingTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/SubscriptionLoggingTest.java @@ -327,7 +327,7 @@ public class SubscriptionLoggingTest extends AbstractTestLogging int PREFETCH = 15; //Create new session with small prefetch - _session = ((AMQConnection) _connection).createSession(false, Session.AUTO_ACKNOWLEDGE, PREFETCH); + _session = ((AMQConnection) _connection).createSession(true, Session.AUTO_ACKNOWLEDGE, PREFETCH); MessageConsumer consumer = _session.createConsumer(_queue); @@ -336,16 +336,11 @@ public class SubscriptionLoggingTest extends AbstractTestLogging //Fill the prefetch and two extra so that our receive bellow allows the // subscription to become active then return to a suspended state. sendMessage(_session, _queue, 17); - + _session.commit(); // Retreive the first message, and start the flow of messages assertNotNull("First message not retreived", consumer.receive(1000)); - - //Give the internal broker time to respond to the ack that the above - // receive will perform. - if (!isExternalBroker()) - { - Thread.sleep(1000); - } + _session.commit(); + _connection.close(); @@ -356,7 +351,7 @@ public class SubscriptionLoggingTest extends AbstractTestLogging { // Validation expects three messages. // The first will be logged by the QueueActor as part of the processQueue thread -// INFO - MESSAGE [vh(/test)/qu(example.queue)] [sub:6(qu(example.queue))] SUB-1003 : State : SUSPENDED +// INFO - MESSAGE [vh(/test)/qu(example.queue)] [sub:6(qu(example.queue))] SUB-1003 : State : SUSPENDED // The second will be by the connnection as it acknowledges and activates the subscription // INFO - MESSAGE [con:6(guest@anonymous(26562441)/test)/ch:3] [sub:6(qu(example.queue))] SUB-1003 : State : ACTIVE // The final one can be the subscription suspending as part of the SubFlushRunner or the processQueue thread @@ -365,7 +360,7 @@ public class SubscriptionLoggingTest extends AbstractTestLogging // INFO - MESSAGE [sub:6(vh(test)/qu(example.queue))] [sub:6(qu(example.queue))] SUB-1003 : State : SUSPENDED // INFO - MESSAGE [vh(/test)/qu(example.queue)] [sub:6(qu(example.queue))] SUB-1003 : State : SUSPENDED - assertEquals("Result set larger than expected.", 3, results.size()); + assertEquals("Result set not expected size:", 3, results.size()); // Validate Initial Suspension String expectedState = "SUSPENDED"; diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/SelectorTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/SelectorTest.java index af11a94ca3..5a5e23baa5 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/SelectorTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/SelectorTest.java @@ -20,7 +20,7 @@ public class SelectorTest extends QpidTestCase { Connection conn = getConnection(); conn.start(); - Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + Session session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE); Destination dest = session.createQueue("SelectorQueue"); @@ -32,6 +32,7 @@ public class SelectorTest extends QpidTestCase Message msg = session.createTextMessage("Msg" + String.valueOf(i)); prod.send(msg); } + session.commit(); Message msg1 = consumer.receive(1000); Message msg2 = consumer.receive(1000); @@ -39,6 +40,8 @@ public class SelectorTest extends QpidTestCase Assert.assertNotNull("Msg1 should not be null", msg1); Assert.assertNotNull("Msg2 should not be null", msg2); + session.commit(); + prod.setDisableMessageID(true); for (int i=0; i<2; i++) @@ -47,14 +50,15 @@ public class SelectorTest extends QpidTestCase prod.send(msg); } + session.commit(); Message msg3 = consumer.receive(1000); Assert.assertNull("Msg3 should be null", msg3); - + session.commit(); consumer = session.createConsumer(dest,"JMSMessageID IS NULL"); Message msg4 = consumer.receive(1000); Message msg5 = consumer.receive(1000); - + session.commit(); Assert.assertNotNull("Msg4 should not be null", msg4); Assert.assertNotNull("Msg5 should not be null", msg5); } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/forwardall/Client.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/forwardall/Client.java index 0be11011b4..d911bb33d7 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/forwardall/Client.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/forwardall/Client.java @@ -29,6 +29,7 @@ import org.apache.qpid.test.utils.QpidTestCase; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.MessageProducer; @@ -62,7 +63,7 @@ public class Client implements MessageListener { _connection = connection; _expected = expected; - _session = (AMQSession) _connection.createSession(false, AMQSession.NO_ACKNOWLEDGE); + _session = (AMQSession) _connection.createSession(true, AMQSession.NO_ACKNOWLEDGE); AMQQueue response = new AMQQueue(_connection.getDefaultQueueExchangeName(), new AMQShortString("ResponseQueue"), true); _session.createConsumer(response).setMessageListener(this); @@ -73,6 +74,7 @@ public class Client implements MessageListener request.setJMSReplyTo(response); MessageProducer prod = _session.createProducer(service); prod.send(request); + _session.commit(); } void shutdownWhenComplete() throws Exception @@ -90,6 +92,14 @@ public class Client implements MessageListener notifyAll(); } + try + { + _session.commit(); + } + catch (JMSException e) + { + + } } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/forwardall/Service.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/forwardall/Service.java index 9cd8b183af..ce50ceae19 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/forwardall/Service.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/forwardall/Service.java @@ -55,7 +55,7 @@ public class Service implements MessageListener { _connection = connection; //AMQQueue queue = new SpecialQueue(connection, "ServiceQueue"); - _session = (AMQSession) _connection.createSession(false, AMQSession.NO_ACKNOWLEDGE); + _session = (AMQSession) _connection.createSession(true, AMQSession.NO_ACKNOWLEDGE); AMQQueue queue = (AMQQueue) _session.createQueue("ServiceQueue") ; _session.createConsumer(queue).setMessageListener(this); _connection.start(); @@ -68,6 +68,7 @@ public class Service implements MessageListener Message response = _session.createTextMessage("Response!"); Destination replyTo = request.getJMSReplyTo(); _session.createProducer(replyTo).send(response); + _session.commit(); } catch (Exception e) { diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java index 95b90481c7..f8ba7060a9 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java @@ -57,7 +57,7 @@ public class TopicSessionTest extends QpidTestCase AMQConnection con = (AMQConnection) getConnection("guest", "guest"); AMQTopic topic = new AMQTopic(con.getDefaultTopicExchangeName(), "MyTopic"); - TopicSession session1 = con.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE); + TopicSession session1 = con.createTopicSession(true, AMQSession.NO_ACKNOWLEDGE); TopicSubscriber sub = session1.createDurableSubscriber(topic, "subscription0"); TopicPublisher publisher = session1.createPublisher(topic); @@ -65,10 +65,11 @@ public class TopicSessionTest extends QpidTestCase TextMessage tm = session1.createTextMessage("Hello"); publisher.publish(tm); + session1.commit(); tm = (TextMessage) sub.receive(2000); assertNotNull(tm); - + session1.commit(); session1.unsubscribe("subscription0"); try @@ -104,15 +105,17 @@ public class TopicSessionTest extends QpidTestCase AMQTopic topic = new AMQTopic(con, "MyTopic1" + String.valueOf(shutdown)); AMQTopic topic2 = new AMQTopic(con, "MyOtherTopic1" + String.valueOf(shutdown)); - TopicSession session1 = con.createTopicSession(false, AMQSession.AUTO_ACKNOWLEDGE); + TopicSession session1 = con.createTopicSession(true, AMQSession.AUTO_ACKNOWLEDGE); TopicSubscriber sub = session1.createDurableSubscriber(topic, "subscription0"); TopicPublisher publisher = session1.createPublisher(null); con.start(); publisher.publish(topic, session1.createTextMessage("hello")); + session1.commit(); TextMessage m = (TextMessage) sub.receive(2000); assertNotNull(m); + session1.commit(); if (shutdown) { @@ -120,17 +123,20 @@ public class TopicSessionTest extends QpidTestCase con.close(); con = (AMQConnection) getConnection("guest", "guest"); con.start(); - session1 = con.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE); + session1 = con.createTopicSession(true, AMQSession.NO_ACKNOWLEDGE); publisher = session1.createPublisher(null); } TopicSubscriber sub2 = session1.createDurableSubscriber(topic2, "subscription0"); publisher.publish(topic, session1.createTextMessage("hello")); + session1.commit(); if (!shutdown) { m = (TextMessage) sub.receive(2000); assertNull(m); + session1.commit(); } publisher.publish(topic2, session1.createTextMessage("goodbye")); + session1.commit(); m = (TextMessage) sub2.receive(2000); assertNotNull(m); assertEquals("goodbye", m.getText()); @@ -143,25 +149,29 @@ public class TopicSessionTest extends QpidTestCase AMQConnection con1 = (AMQConnection) getConnection("guest", "guest", "clientid"); AMQTopic topic = new AMQTopic(con1, "MyTopic3"); - TopicSession session1 = con1.createTopicSession(false, AMQSession.AUTO_ACKNOWLEDGE); + TopicSession session1 = con1.createTopicSession(true, AMQSession.AUTO_ACKNOWLEDGE); TopicPublisher publisher = session1.createPublisher(topic); AMQConnection con2 = (AMQConnection) getConnection("guest", "guest", "clientid"); - TopicSession session2 = con2.createTopicSession(false, AMQSession.AUTO_ACKNOWLEDGE); + TopicSession session2 = con2.createTopicSession(true, AMQSession.AUTO_ACKNOWLEDGE); TopicSubscriber sub = session2.createDurableSubscriber(topic, "subscription0"); con2.start(); publisher.publish(session1.createTextMessage("Hello")); + session1.commit(); TextMessage tm = (TextMessage) sub.receive(2000); + session2.commit(); assertNotNull(tm); con2.close(); publisher.publish(session1.createTextMessage("Hello2")); + session1.commit(); con2 = (AMQConnection) getConnection("guest", "guest", "clientid"); - session2 = con2.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE); + session2 = con2.createTopicSession(true, AMQSession.NO_ACKNOWLEDGE); sub = session2.createDurableSubscriber(topic, "subscription0"); con2.start(); tm = (TextMessage) sub.receive(2000); + session2.commit(); assertNotNull(tm); assertEquals("Hello2", tm.getText()); session2.unsubscribe("subscription0"); @@ -174,12 +184,13 @@ public class TopicSessionTest extends QpidTestCase AMQConnection con = (AMQConnection) getConnection("guest", "guest"); AMQTopic topic = new AMQTopic(con, "MyTopic4"); - TopicSession session1 = con.createTopicSession(false, AMQSession.AUTO_ACKNOWLEDGE); + TopicSession session1 = con.createTopicSession(true, AMQSession.AUTO_ACKNOWLEDGE); TopicPublisher publisher = session1.createPublisher(topic); MessageConsumer consumer1 = session1.createConsumer(topic); con.start(); TextMessage tm = session1.createTextMessage("Hello"); publisher.publish(tm); + session1.commit(); tm = (TextMessage) consumer1.receive(10000L); assertNotNull(tm); String msgText = tm.getText(); @@ -188,15 +199,19 @@ public class TopicSessionTest extends QpidTestCase msgText = tm.getText(); assertNull(msgText); publisher.publish(tm); + session1.commit(); tm = (TextMessage) consumer1.receive(10000L); assertNotNull(tm); + session1.commit(); msgText = tm.getText(); assertNull(msgText); tm.clearBody(); tm.setText("Now we are not null"); publisher.publish(tm); + session1.commit(); tm = (TextMessage) consumer1.receive(2000); assertNotNull(tm); + session1.commit(); msgText = tm.getText(); assertEquals("Now we are not null", msgText); @@ -204,7 +219,9 @@ public class TopicSessionTest extends QpidTestCase msgText = tm.getText(); assertEquals("Empty string not returned", "", msgText); publisher.publish(tm); + session1.commit(); tm = (TextMessage) consumer1.receive(2000); + session1.commit(); assertNotNull(tm); assertEquals("Empty string not returned", "", msgText); con.close(); @@ -213,7 +230,7 @@ public class TopicSessionTest extends QpidTestCase public void testSendingSameMessage() throws Exception { AMQConnection conn = (AMQConnection) getConnection("guest", "guest"); - TopicSession session = conn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); + TopicSession session = conn.createTopicSession(true, Session.AUTO_ACKNOWLEDGE); TemporaryTopic topic = session.createTemporaryTopic(); assertNotNull(topic); TopicPublisher producer = session.createPublisher(topic); @@ -221,14 +238,16 @@ public class TopicSessionTest extends QpidTestCase conn.start(); TextMessage sentMessage = session.createTextMessage("Test Message"); producer.send(sentMessage); + session.commit(); TextMessage receivedMessage = (TextMessage) consumer.receive(2000); assertNotNull(receivedMessage); assertEquals(sentMessage.getText(), receivedMessage.getText()); producer.send(sentMessage); + session.commit(); receivedMessage = (TextMessage) consumer.receive(2000); assertNotNull(receivedMessage); assertEquals(sentMessage.getText(), receivedMessage.getText()); - + session.commit(); conn.close(); } @@ -236,17 +255,18 @@ public class TopicSessionTest extends QpidTestCase public void testTemporaryTopic() throws Exception { AMQConnection conn = (AMQConnection) getConnection("guest", "guest"); - TopicSession session = conn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); + TopicSession session = conn.createTopicSession(true, Session.AUTO_ACKNOWLEDGE); TemporaryTopic topic = session.createTemporaryTopic(); assertNotNull(topic); TopicPublisher producer = session.createPublisher(topic); MessageConsumer consumer = session.createConsumer(topic); conn.start(); producer.send(session.createTextMessage("hello")); + session.commit(); TextMessage tm = (TextMessage) consumer.receive(2000); assertNotNull(tm); assertEquals("hello", tm.getText()); - + session.commit(); try { topic.delete(); @@ -291,7 +311,7 @@ public class TopicSessionTest extends QpidTestCase AMQTopic topic = new AMQTopic(con, "testNoLocal"); - TopicSession session1 = con.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE); + TopicSession session1 = con.createTopicSession(true, AMQSession.NO_ACKNOWLEDGE); TopicSubscriber noLocal = session1.createSubscriber(topic, "", true); TopicSubscriber select = session1.createSubscriber(topic, "Selector = 'select'", false); TopicSubscriber normal = session1.createSubscriber(topic); @@ -304,15 +324,17 @@ public class TopicSessionTest extends QpidTestCase //send message to all consumers publisher.publish(session1.createTextMessage("hello-new2")); - + session1.commit(); //test normal subscriber gets message m = (TextMessage) normal.receive(1000); assertNotNull(m); - + session1.commit(); + //test selector subscriber doesn't message m = (TextMessage) select.receive(1000); assertNull(m); - + session1.commit(); + //test nolocal subscriber doesn't message m = (TextMessage) noLocal.receive(1000); if (m != null) @@ -326,21 +348,24 @@ public class TopicSessionTest extends QpidTestCase message.setStringProperty("Selector", "select"); publisher.publish(message); - + session1.commit(); + //test normal subscriber gets message m = (TextMessage) normal.receive(1000); assertNotNull(m); - + session1.commit(); + //test selector subscriber does get message m = (TextMessage) select.receive(1000); assertNotNull(m); + session1.commit(); //test nolocal subscriber doesn't message m = (TextMessage) noLocal.receive(100); assertNull(m); AMQConnection con2 = (AMQConnection) getConnection("guest", "guest", "foo"); - TopicSession session2 = con2.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE); + TopicSession session2 = con2.createTopicSession(true, AMQSession.NO_ACKNOWLEDGE); TopicPublisher publisher2 = session2.createPublisher(topic); @@ -348,14 +373,17 @@ public class TopicSessionTest extends QpidTestCase message.setStringProperty("Selector", "select"); publisher2.publish(message); + session2.commit(); //test normal subscriber gets message m = (TextMessage) normal.receive(1000); assertNotNull(m); + session1.commit(); //test selector subscriber does get message m = (TextMessage) select.receive(1000); assertNotNull(m); + session1.commit(); //test nolocal subscriber does message m = (TextMessage) noLocal.receive(100); @@ -378,7 +406,7 @@ public class TopicSessionTest extends QpidTestCase // Setup Topic AMQTopic topic = new AMQTopic(con, "testNoLocal"); - TopicSession session = con.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE); + TopicSession session = con.createTopicSession(true, AMQSession.NO_ACKNOWLEDGE); // Setup subscriber with selector TopicSubscriber selector = session.createSubscriber(topic, "Selector = 'select'", false); @@ -391,13 +419,15 @@ public class TopicSessionTest extends QpidTestCase // Send non-matching message message = session.createTextMessage("non-matching 1"); publisher.publish(message); + session.commit(); // Send and consume matching message message = session.createTextMessage("hello"); message.setStringProperty("Selector", "select"); publisher.publish(message); - + session.commit(); + m = (TextMessage) selector.receive(1000); assertNotNull("should have received message", m); assertEquals("Message contents were wrong", "hello", m.getText()); @@ -405,7 +435,8 @@ public class TopicSessionTest extends QpidTestCase // Send non-matching message message = session.createTextMessage("non-matching 2"); publisher.publish(message); - + session.commit(); + // Assert queue count is 0 long depth = ((AMQTopicSessionAdaptor) session).getSession().getQueueDepth(topic); assertEquals("Queue depth was wrong", 0, depth); |