summaryrefslogtreecommitdiff
path: root/java/client/test/src
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2006-10-12 16:32:47 +0000
committerGordon Sim <gsim@apache.org>2006-10-12 16:32:47 +0000
commit0629359c41c88ab1b7c14c96cb4735fbc25764f7 (patch)
tree16ffdf589ee283e569fea4232384ba7f19fba363 /java/client/test/src
parentaf5784ad936f12da595618a48214a83c64acdbfb (diff)
downloadqpid-python-0629359c41c88ab1b7c14c96cb4735fbc25764f7.tar.gz
Fixed ack.RecoverTest and ack.DisconnectAndRedeliverTest. These were failing due to a race condition
where the consumers queue was not bound by the time the publisher sent messages. This is a result of the use of nowait=true for the declare/bind/consume cycle for a BasicMessageConsumer. To work around this in tests like these that have two connections, one consuming & one publishing, I added a declareExchangeSynch() method to AMQSession which allows a thread to block until the session it invokes that method on has processed all the commands up to that point. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@463309 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/test/src')
-rw-r--r--java/client/test/src/org/apache/qpid/ack/DisconnectAndRedeliverTest.java13
-rw-r--r--java/client/test/src/org/apache/qpid/ack/RecoverTest.java9
2 files changed, 15 insertions, 7 deletions
diff --git a/java/client/test/src/org/apache/qpid/ack/DisconnectAndRedeliverTest.java b/java/client/test/src/org/apache/qpid/ack/DisconnectAndRedeliverTest.java
index ae057c0e82..68460682ec 100644
--- a/java/client/test/src/org/apache/qpid/ack/DisconnectAndRedeliverTest.java
+++ b/java/client/test/src/org/apache/qpid/ack/DisconnectAndRedeliverTest.java
@@ -22,6 +22,7 @@ import org.apache.log4j.Logger;
import org.apache.log4j.xml.DOMConfigurator;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.transport.TransportConnection;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
@@ -68,9 +69,11 @@ public class DisconnectAndRedeliverTest
TestableMemoryMessageStore store = (TestableMemoryMessageStore) ApplicationRegistry.getInstance().getMessageStore();
- Session consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ Session consumerSession = (AMQSession) con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
AMQQueue queue = new AMQQueue("someQ", "someQ", false, false);
MessageConsumer consumer = consumerSession.createConsumer(queue);
+ //force synch to ensure the consumer has resulted in a bound queue
+ ((AMQSession) consumerSession).declareExchangeSynch("amq.direct", "direct");
Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test");
Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
@@ -103,15 +106,15 @@ public class DisconnectAndRedeliverTest
con.start();
tm = (TextMessage) consumer.receive(3000);
- Assert.assertEquals(tm.getText(), "msg2");
+ Assert.assertEquals("msg2", tm.getText());
tm = (TextMessage) consumer.receive(3000);
- Assert.assertEquals(tm.getText(), "msg3");
+ Assert.assertEquals("msg3", tm.getText());
tm = (TextMessage) consumer.receive(3000);
- Assert.assertEquals(tm.getText(), "msg4");
+ Assert.assertEquals("msg4", tm.getText());
_logger.info("Received redelivery of three messages. Acknowledging last message");
tm.acknowledge();
@@ -157,6 +160,8 @@ public class DisconnectAndRedeliverTest
Session consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Queue queue = new AMQQueue("someQ", "someQ", false, true);
MessageConsumer consumer = consumerSession.createConsumer(queue);
+ //force synch to ensure the consumer has resulted in a bound queue
+ ((AMQSession) consumerSession).declareExchangeSynch("amq.direct", "direct");
Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test");
Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
diff --git a/java/client/test/src/org/apache/qpid/ack/RecoverTest.java b/java/client/test/src/org/apache/qpid/ack/RecoverTest.java
index cd7c43d4a1..22b7b41140 100644
--- a/java/client/test/src/org/apache/qpid/ack/RecoverTest.java
+++ b/java/client/test/src/org/apache/qpid/ack/RecoverTest.java
@@ -20,6 +20,7 @@ package org.apache.qpid.ack;
import junit.framework.JUnit4TestAdapter;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.transport.TransportConnection;
import org.apache.qpid.vmbroker.AMQVMBrokerCreationException;
import org.apache.log4j.Logger;
@@ -55,6 +56,8 @@ public class RecoverTest
Session consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Queue queue = new AMQQueue("someQ", "someQ", false, true);
MessageConsumer consumer = consumerSession.createConsumer(queue);
+ //force synch to ensure the consumer has resulted in a bound queue
+ ((AMQSession) consumerSession).declareExchangeSynch("amq.direct", "direct");
Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test");
Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
@@ -80,13 +83,13 @@ public class RecoverTest
// no ack for last three messages so when I call recover I expect to get three messages back
consumerSession.recover();
tm = (TextMessage) consumer.receive(3000);
- Assert.assertEquals(tm.getText(), "msg2");
+ Assert.assertEquals("msg2", tm.getText());
tm = (TextMessage) consumer.receive(3000);
- Assert.assertEquals(tm.getText(), "msg3");
+ Assert.assertEquals("msg3", tm.getText());
tm = (TextMessage) consumer.receive(3000);
- Assert.assertEquals(tm.getText(), "msg4");
+ Assert.assertEquals("msg4", tm.getText());
_logger.info("Received redelivery of three messages. Acknowledging last message");
tm.acknowledge();