summaryrefslogtreecommitdiff
path: root/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java')
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java75
1 files changed, 53 insertions, 22 deletions
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java
index 95b90481c7..f8ba7060a9 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java
@@ -57,7 +57,7 @@ public class TopicSessionTest extends QpidTestCase
AMQConnection con = (AMQConnection) getConnection("guest", "guest");
AMQTopic topic = new AMQTopic(con.getDefaultTopicExchangeName(), "MyTopic");
- TopicSession session1 = con.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE);
+ TopicSession session1 = con.createTopicSession(true, AMQSession.NO_ACKNOWLEDGE);
TopicSubscriber sub = session1.createDurableSubscriber(topic, "subscription0");
TopicPublisher publisher = session1.createPublisher(topic);
@@ -65,10 +65,11 @@ public class TopicSessionTest extends QpidTestCase
TextMessage tm = session1.createTextMessage("Hello");
publisher.publish(tm);
+ session1.commit();
tm = (TextMessage) sub.receive(2000);
assertNotNull(tm);
-
+ session1.commit();
session1.unsubscribe("subscription0");
try
@@ -104,15 +105,17 @@ public class TopicSessionTest extends QpidTestCase
AMQTopic topic = new AMQTopic(con, "MyTopic1" + String.valueOf(shutdown));
AMQTopic topic2 = new AMQTopic(con, "MyOtherTopic1" + String.valueOf(shutdown));
- TopicSession session1 = con.createTopicSession(false, AMQSession.AUTO_ACKNOWLEDGE);
+ TopicSession session1 = con.createTopicSession(true, AMQSession.AUTO_ACKNOWLEDGE);
TopicSubscriber sub = session1.createDurableSubscriber(topic, "subscription0");
TopicPublisher publisher = session1.createPublisher(null);
con.start();
publisher.publish(topic, session1.createTextMessage("hello"));
+ session1.commit();
TextMessage m = (TextMessage) sub.receive(2000);
assertNotNull(m);
+ session1.commit();
if (shutdown)
{
@@ -120,17 +123,20 @@ public class TopicSessionTest extends QpidTestCase
con.close();
con = (AMQConnection) getConnection("guest", "guest");
con.start();
- session1 = con.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE);
+ session1 = con.createTopicSession(true, AMQSession.NO_ACKNOWLEDGE);
publisher = session1.createPublisher(null);
}
TopicSubscriber sub2 = session1.createDurableSubscriber(topic2, "subscription0");
publisher.publish(topic, session1.createTextMessage("hello"));
+ session1.commit();
if (!shutdown)
{
m = (TextMessage) sub.receive(2000);
assertNull(m);
+ session1.commit();
}
publisher.publish(topic2, session1.createTextMessage("goodbye"));
+ session1.commit();
m = (TextMessage) sub2.receive(2000);
assertNotNull(m);
assertEquals("goodbye", m.getText());
@@ -143,25 +149,29 @@ public class TopicSessionTest extends QpidTestCase
AMQConnection con1 = (AMQConnection) getConnection("guest", "guest", "clientid");
AMQTopic topic = new AMQTopic(con1, "MyTopic3");
- TopicSession session1 = con1.createTopicSession(false, AMQSession.AUTO_ACKNOWLEDGE);
+ TopicSession session1 = con1.createTopicSession(true, AMQSession.AUTO_ACKNOWLEDGE);
TopicPublisher publisher = session1.createPublisher(topic);
AMQConnection con2 = (AMQConnection) getConnection("guest", "guest", "clientid");
- TopicSession session2 = con2.createTopicSession(false, AMQSession.AUTO_ACKNOWLEDGE);
+ TopicSession session2 = con2.createTopicSession(true, AMQSession.AUTO_ACKNOWLEDGE);
TopicSubscriber sub = session2.createDurableSubscriber(topic, "subscription0");
con2.start();
publisher.publish(session1.createTextMessage("Hello"));
+ session1.commit();
TextMessage tm = (TextMessage) sub.receive(2000);
+ session2.commit();
assertNotNull(tm);
con2.close();
publisher.publish(session1.createTextMessage("Hello2"));
+ session1.commit();
con2 = (AMQConnection) getConnection("guest", "guest", "clientid");
- session2 = con2.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE);
+ session2 = con2.createTopicSession(true, AMQSession.NO_ACKNOWLEDGE);
sub = session2.createDurableSubscriber(topic, "subscription0");
con2.start();
tm = (TextMessage) sub.receive(2000);
+ session2.commit();
assertNotNull(tm);
assertEquals("Hello2", tm.getText());
session2.unsubscribe("subscription0");
@@ -174,12 +184,13 @@ public class TopicSessionTest extends QpidTestCase
AMQConnection con = (AMQConnection) getConnection("guest", "guest");
AMQTopic topic = new AMQTopic(con, "MyTopic4");
- TopicSession session1 = con.createTopicSession(false, AMQSession.AUTO_ACKNOWLEDGE);
+ TopicSession session1 = con.createTopicSession(true, AMQSession.AUTO_ACKNOWLEDGE);
TopicPublisher publisher = session1.createPublisher(topic);
MessageConsumer consumer1 = session1.createConsumer(topic);
con.start();
TextMessage tm = session1.createTextMessage("Hello");
publisher.publish(tm);
+ session1.commit();
tm = (TextMessage) consumer1.receive(10000L);
assertNotNull(tm);
String msgText = tm.getText();
@@ -188,15 +199,19 @@ public class TopicSessionTest extends QpidTestCase
msgText = tm.getText();
assertNull(msgText);
publisher.publish(tm);
+ session1.commit();
tm = (TextMessage) consumer1.receive(10000L);
assertNotNull(tm);
+ session1.commit();
msgText = tm.getText();
assertNull(msgText);
tm.clearBody();
tm.setText("Now we are not null");
publisher.publish(tm);
+ session1.commit();
tm = (TextMessage) consumer1.receive(2000);
assertNotNull(tm);
+ session1.commit();
msgText = tm.getText();
assertEquals("Now we are not null", msgText);
@@ -204,7 +219,9 @@ public class TopicSessionTest extends QpidTestCase
msgText = tm.getText();
assertEquals("Empty string not returned", "", msgText);
publisher.publish(tm);
+ session1.commit();
tm = (TextMessage) consumer1.receive(2000);
+ session1.commit();
assertNotNull(tm);
assertEquals("Empty string not returned", "", msgText);
con.close();
@@ -213,7 +230,7 @@ public class TopicSessionTest extends QpidTestCase
public void testSendingSameMessage() throws Exception
{
AMQConnection conn = (AMQConnection) getConnection("guest", "guest");
- TopicSession session = conn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+ TopicSession session = conn.createTopicSession(true, Session.AUTO_ACKNOWLEDGE);
TemporaryTopic topic = session.createTemporaryTopic();
assertNotNull(topic);
TopicPublisher producer = session.createPublisher(topic);
@@ -221,14 +238,16 @@ public class TopicSessionTest extends QpidTestCase
conn.start();
TextMessage sentMessage = session.createTextMessage("Test Message");
producer.send(sentMessage);
+ session.commit();
TextMessage receivedMessage = (TextMessage) consumer.receive(2000);
assertNotNull(receivedMessage);
assertEquals(sentMessage.getText(), receivedMessage.getText());
producer.send(sentMessage);
+ session.commit();
receivedMessage = (TextMessage) consumer.receive(2000);
assertNotNull(receivedMessage);
assertEquals(sentMessage.getText(), receivedMessage.getText());
-
+ session.commit();
conn.close();
}
@@ -236,17 +255,18 @@ public class TopicSessionTest extends QpidTestCase
public void testTemporaryTopic() throws Exception
{
AMQConnection conn = (AMQConnection) getConnection("guest", "guest");
- TopicSession session = conn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+ TopicSession session = conn.createTopicSession(true, Session.AUTO_ACKNOWLEDGE);
TemporaryTopic topic = session.createTemporaryTopic();
assertNotNull(topic);
TopicPublisher producer = session.createPublisher(topic);
MessageConsumer consumer = session.createConsumer(topic);
conn.start();
producer.send(session.createTextMessage("hello"));
+ session.commit();
TextMessage tm = (TextMessage) consumer.receive(2000);
assertNotNull(tm);
assertEquals("hello", tm.getText());
-
+ session.commit();
try
{
topic.delete();
@@ -291,7 +311,7 @@ public class TopicSessionTest extends QpidTestCase
AMQTopic topic = new AMQTopic(con, "testNoLocal");
- TopicSession session1 = con.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE);
+ TopicSession session1 = con.createTopicSession(true, AMQSession.NO_ACKNOWLEDGE);
TopicSubscriber noLocal = session1.createSubscriber(topic, "", true);
TopicSubscriber select = session1.createSubscriber(topic, "Selector = 'select'", false);
TopicSubscriber normal = session1.createSubscriber(topic);
@@ -304,15 +324,17 @@ public class TopicSessionTest extends QpidTestCase
//send message to all consumers
publisher.publish(session1.createTextMessage("hello-new2"));
-
+ session1.commit();
//test normal subscriber gets message
m = (TextMessage) normal.receive(1000);
assertNotNull(m);
-
+ session1.commit();
+
//test selector subscriber doesn't message
m = (TextMessage) select.receive(1000);
assertNull(m);
-
+ session1.commit();
+
//test nolocal subscriber doesn't message
m = (TextMessage) noLocal.receive(1000);
if (m != null)
@@ -326,21 +348,24 @@ public class TopicSessionTest extends QpidTestCase
message.setStringProperty("Selector", "select");
publisher.publish(message);
-
+ session1.commit();
+
//test normal subscriber gets message
m = (TextMessage) normal.receive(1000);
assertNotNull(m);
-
+ session1.commit();
+
//test selector subscriber does get message
m = (TextMessage) select.receive(1000);
assertNotNull(m);
+ session1.commit();
//test nolocal subscriber doesn't message
m = (TextMessage) noLocal.receive(100);
assertNull(m);
AMQConnection con2 = (AMQConnection) getConnection("guest", "guest", "foo");
- TopicSession session2 = con2.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE);
+ TopicSession session2 = con2.createTopicSession(true, AMQSession.NO_ACKNOWLEDGE);
TopicPublisher publisher2 = session2.createPublisher(topic);
@@ -348,14 +373,17 @@ public class TopicSessionTest extends QpidTestCase
message.setStringProperty("Selector", "select");
publisher2.publish(message);
+ session2.commit();
//test normal subscriber gets message
m = (TextMessage) normal.receive(1000);
assertNotNull(m);
+ session1.commit();
//test selector subscriber does get message
m = (TextMessage) select.receive(1000);
assertNotNull(m);
+ session1.commit();
//test nolocal subscriber does message
m = (TextMessage) noLocal.receive(100);
@@ -378,7 +406,7 @@ public class TopicSessionTest extends QpidTestCase
// Setup Topic
AMQTopic topic = new AMQTopic(con, "testNoLocal");
- TopicSession session = con.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE);
+ TopicSession session = con.createTopicSession(true, AMQSession.NO_ACKNOWLEDGE);
// Setup subscriber with selector
TopicSubscriber selector = session.createSubscriber(topic, "Selector = 'select'", false);
@@ -391,13 +419,15 @@ public class TopicSessionTest extends QpidTestCase
// Send non-matching message
message = session.createTextMessage("non-matching 1");
publisher.publish(message);
+ session.commit();
// Send and consume matching message
message = session.createTextMessage("hello");
message.setStringProperty("Selector", "select");
publisher.publish(message);
-
+ session.commit();
+
m = (TextMessage) selector.receive(1000);
assertNotNull("should have received message", m);
assertEquals("Message contents were wrong", "hello", m.getText());
@@ -405,7 +435,8 @@ public class TopicSessionTest extends QpidTestCase
// Send non-matching message
message = session.createTextMessage("non-matching 2");
publisher.publish(message);
-
+ session.commit();
+
// Assert queue count is 0
long depth = ((AMQTopicSessionAdaptor) session).getSession().getQueueDepth(topic);
assertEquals("Queue depth was wrong", 0, depth);