diff options
Diffstat (limited to 'qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java')
-rw-r--r-- | qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java | 75 |
1 files changed, 53 insertions, 22 deletions
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java index 95b90481c7..f8ba7060a9 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java @@ -57,7 +57,7 @@ public class TopicSessionTest extends QpidTestCase AMQConnection con = (AMQConnection) getConnection("guest", "guest"); AMQTopic topic = new AMQTopic(con.getDefaultTopicExchangeName(), "MyTopic"); - TopicSession session1 = con.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE); + TopicSession session1 = con.createTopicSession(true, AMQSession.NO_ACKNOWLEDGE); TopicSubscriber sub = session1.createDurableSubscriber(topic, "subscription0"); TopicPublisher publisher = session1.createPublisher(topic); @@ -65,10 +65,11 @@ public class TopicSessionTest extends QpidTestCase TextMessage tm = session1.createTextMessage("Hello"); publisher.publish(tm); + session1.commit(); tm = (TextMessage) sub.receive(2000); assertNotNull(tm); - + session1.commit(); session1.unsubscribe("subscription0"); try @@ -104,15 +105,17 @@ public class TopicSessionTest extends QpidTestCase AMQTopic topic = new AMQTopic(con, "MyTopic1" + String.valueOf(shutdown)); AMQTopic topic2 = new AMQTopic(con, "MyOtherTopic1" + String.valueOf(shutdown)); - TopicSession session1 = con.createTopicSession(false, AMQSession.AUTO_ACKNOWLEDGE); + TopicSession session1 = con.createTopicSession(true, AMQSession.AUTO_ACKNOWLEDGE); TopicSubscriber sub = session1.createDurableSubscriber(topic, "subscription0"); TopicPublisher publisher = session1.createPublisher(null); con.start(); publisher.publish(topic, session1.createTextMessage("hello")); + session1.commit(); TextMessage m = (TextMessage) sub.receive(2000); assertNotNull(m); + session1.commit(); if (shutdown) { @@ -120,17 +123,20 @@ public class TopicSessionTest extends QpidTestCase con.close(); con = (AMQConnection) getConnection("guest", "guest"); con.start(); - session1 = con.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE); + session1 = con.createTopicSession(true, AMQSession.NO_ACKNOWLEDGE); publisher = session1.createPublisher(null); } TopicSubscriber sub2 = session1.createDurableSubscriber(topic2, "subscription0"); publisher.publish(topic, session1.createTextMessage("hello")); + session1.commit(); if (!shutdown) { m = (TextMessage) sub.receive(2000); assertNull(m); + session1.commit(); } publisher.publish(topic2, session1.createTextMessage("goodbye")); + session1.commit(); m = (TextMessage) sub2.receive(2000); assertNotNull(m); assertEquals("goodbye", m.getText()); @@ -143,25 +149,29 @@ public class TopicSessionTest extends QpidTestCase AMQConnection con1 = (AMQConnection) getConnection("guest", "guest", "clientid"); AMQTopic topic = new AMQTopic(con1, "MyTopic3"); - TopicSession session1 = con1.createTopicSession(false, AMQSession.AUTO_ACKNOWLEDGE); + TopicSession session1 = con1.createTopicSession(true, AMQSession.AUTO_ACKNOWLEDGE); TopicPublisher publisher = session1.createPublisher(topic); AMQConnection con2 = (AMQConnection) getConnection("guest", "guest", "clientid"); - TopicSession session2 = con2.createTopicSession(false, AMQSession.AUTO_ACKNOWLEDGE); + TopicSession session2 = con2.createTopicSession(true, AMQSession.AUTO_ACKNOWLEDGE); TopicSubscriber sub = session2.createDurableSubscriber(topic, "subscription0"); con2.start(); publisher.publish(session1.createTextMessage("Hello")); + session1.commit(); TextMessage tm = (TextMessage) sub.receive(2000); + session2.commit(); assertNotNull(tm); con2.close(); publisher.publish(session1.createTextMessage("Hello2")); + session1.commit(); con2 = (AMQConnection) getConnection("guest", "guest", "clientid"); - session2 = con2.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE); + session2 = con2.createTopicSession(true, AMQSession.NO_ACKNOWLEDGE); sub = session2.createDurableSubscriber(topic, "subscription0"); con2.start(); tm = (TextMessage) sub.receive(2000); + session2.commit(); assertNotNull(tm); assertEquals("Hello2", tm.getText()); session2.unsubscribe("subscription0"); @@ -174,12 +184,13 @@ public class TopicSessionTest extends QpidTestCase AMQConnection con = (AMQConnection) getConnection("guest", "guest"); AMQTopic topic = new AMQTopic(con, "MyTopic4"); - TopicSession session1 = con.createTopicSession(false, AMQSession.AUTO_ACKNOWLEDGE); + TopicSession session1 = con.createTopicSession(true, AMQSession.AUTO_ACKNOWLEDGE); TopicPublisher publisher = session1.createPublisher(topic); MessageConsumer consumer1 = session1.createConsumer(topic); con.start(); TextMessage tm = session1.createTextMessage("Hello"); publisher.publish(tm); + session1.commit(); tm = (TextMessage) consumer1.receive(10000L); assertNotNull(tm); String msgText = tm.getText(); @@ -188,15 +199,19 @@ public class TopicSessionTest extends QpidTestCase msgText = tm.getText(); assertNull(msgText); publisher.publish(tm); + session1.commit(); tm = (TextMessage) consumer1.receive(10000L); assertNotNull(tm); + session1.commit(); msgText = tm.getText(); assertNull(msgText); tm.clearBody(); tm.setText("Now we are not null"); publisher.publish(tm); + session1.commit(); tm = (TextMessage) consumer1.receive(2000); assertNotNull(tm); + session1.commit(); msgText = tm.getText(); assertEquals("Now we are not null", msgText); @@ -204,7 +219,9 @@ public class TopicSessionTest extends QpidTestCase msgText = tm.getText(); assertEquals("Empty string not returned", "", msgText); publisher.publish(tm); + session1.commit(); tm = (TextMessage) consumer1.receive(2000); + session1.commit(); assertNotNull(tm); assertEquals("Empty string not returned", "", msgText); con.close(); @@ -213,7 +230,7 @@ public class TopicSessionTest extends QpidTestCase public void testSendingSameMessage() throws Exception { AMQConnection conn = (AMQConnection) getConnection("guest", "guest"); - TopicSession session = conn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); + TopicSession session = conn.createTopicSession(true, Session.AUTO_ACKNOWLEDGE); TemporaryTopic topic = session.createTemporaryTopic(); assertNotNull(topic); TopicPublisher producer = session.createPublisher(topic); @@ -221,14 +238,16 @@ public class TopicSessionTest extends QpidTestCase conn.start(); TextMessage sentMessage = session.createTextMessage("Test Message"); producer.send(sentMessage); + session.commit(); TextMessage receivedMessage = (TextMessage) consumer.receive(2000); assertNotNull(receivedMessage); assertEquals(sentMessage.getText(), receivedMessage.getText()); producer.send(sentMessage); + session.commit(); receivedMessage = (TextMessage) consumer.receive(2000); assertNotNull(receivedMessage); assertEquals(sentMessage.getText(), receivedMessage.getText()); - + session.commit(); conn.close(); } @@ -236,17 +255,18 @@ public class TopicSessionTest extends QpidTestCase public void testTemporaryTopic() throws Exception { AMQConnection conn = (AMQConnection) getConnection("guest", "guest"); - TopicSession session = conn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); + TopicSession session = conn.createTopicSession(true, Session.AUTO_ACKNOWLEDGE); TemporaryTopic topic = session.createTemporaryTopic(); assertNotNull(topic); TopicPublisher producer = session.createPublisher(topic); MessageConsumer consumer = session.createConsumer(topic); conn.start(); producer.send(session.createTextMessage("hello")); + session.commit(); TextMessage tm = (TextMessage) consumer.receive(2000); assertNotNull(tm); assertEquals("hello", tm.getText()); - + session.commit(); try { topic.delete(); @@ -291,7 +311,7 @@ public class TopicSessionTest extends QpidTestCase AMQTopic topic = new AMQTopic(con, "testNoLocal"); - TopicSession session1 = con.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE); + TopicSession session1 = con.createTopicSession(true, AMQSession.NO_ACKNOWLEDGE); TopicSubscriber noLocal = session1.createSubscriber(topic, "", true); TopicSubscriber select = session1.createSubscriber(topic, "Selector = 'select'", false); TopicSubscriber normal = session1.createSubscriber(topic); @@ -304,15 +324,17 @@ public class TopicSessionTest extends QpidTestCase //send message to all consumers publisher.publish(session1.createTextMessage("hello-new2")); - + session1.commit(); //test normal subscriber gets message m = (TextMessage) normal.receive(1000); assertNotNull(m); - + session1.commit(); + //test selector subscriber doesn't message m = (TextMessage) select.receive(1000); assertNull(m); - + session1.commit(); + //test nolocal subscriber doesn't message m = (TextMessage) noLocal.receive(1000); if (m != null) @@ -326,21 +348,24 @@ public class TopicSessionTest extends QpidTestCase message.setStringProperty("Selector", "select"); publisher.publish(message); - + session1.commit(); + //test normal subscriber gets message m = (TextMessage) normal.receive(1000); assertNotNull(m); - + session1.commit(); + //test selector subscriber does get message m = (TextMessage) select.receive(1000); assertNotNull(m); + session1.commit(); //test nolocal subscriber doesn't message m = (TextMessage) noLocal.receive(100); assertNull(m); AMQConnection con2 = (AMQConnection) getConnection("guest", "guest", "foo"); - TopicSession session2 = con2.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE); + TopicSession session2 = con2.createTopicSession(true, AMQSession.NO_ACKNOWLEDGE); TopicPublisher publisher2 = session2.createPublisher(topic); @@ -348,14 +373,17 @@ public class TopicSessionTest extends QpidTestCase message.setStringProperty("Selector", "select"); publisher2.publish(message); + session2.commit(); //test normal subscriber gets message m = (TextMessage) normal.receive(1000); assertNotNull(m); + session1.commit(); //test selector subscriber does get message m = (TextMessage) select.receive(1000); assertNotNull(m); + session1.commit(); //test nolocal subscriber does message m = (TextMessage) noLocal.receive(100); @@ -378,7 +406,7 @@ public class TopicSessionTest extends QpidTestCase // Setup Topic AMQTopic topic = new AMQTopic(con, "testNoLocal"); - TopicSession session = con.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE); + TopicSession session = con.createTopicSession(true, AMQSession.NO_ACKNOWLEDGE); // Setup subscriber with selector TopicSubscriber selector = session.createSubscriber(topic, "Selector = 'select'", false); @@ -391,13 +419,15 @@ public class TopicSessionTest extends QpidTestCase // Send non-matching message message = session.createTextMessage("non-matching 1"); publisher.publish(message); + session.commit(); // Send and consume matching message message = session.createTextMessage("hello"); message.setStringProperty("Selector", "select"); publisher.publish(message); - + session.commit(); + m = (TextMessage) selector.receive(1000); assertNotNull("should have received message", m); assertEquals("Message contents were wrong", "hello", m.getText()); @@ -405,7 +435,8 @@ public class TopicSessionTest extends QpidTestCase // Send non-matching message message = session.createTextMessage("non-matching 2"); publisher.publish(message); - + session.commit(); + // Assert queue count is 0 long depth = ((AMQTopicSessionAdaptor) session).getSession().getQueueDepth(topic); assertEquals("Queue depth was wrong", 0, depth); |