diff options
Diffstat (limited to 'java/client/src/test')
8 files changed, 115 insertions, 29 deletions
diff --git a/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java b/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java index 01c3d30314..0739acfabd 100644 --- a/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java +++ b/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java @@ -126,7 +126,7 @@ public class MessageListenerTest extends TestCase implements MessageListener for (int msg = 0; msg < MSG_COUNT; msg++) { - assertTrue(_consumer.receive() != null); + assertTrue(_consumer.receive(2000) != null); } } diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java index 2d69b4fb82..4a8c0145c4 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java @@ -29,21 +29,28 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.exchange.ExchangeDefaults; import javax.jms.*; +import java.util.concurrent.atomic.AtomicInteger; public class RecoverTest extends TestCase { private static final Logger _logger = Logger.getLogger(RecoverTest.class); + private Exception _error; + private AtomicInteger count; + protected void setUp() throws Exception { super.setUp(); TransportConnection.createVMBroker(1); + _error = null; + count = new AtomicInteger(); } protected void tearDown() throws Exception { super.tearDown(); TransportConnection.killAllVMBrokers(); + count = null; } @@ -212,38 +219,93 @@ public class RecoverTest extends TestCase Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "test"); final Session consumerSession = con.createSession(false, Session.AUTO_ACKNOWLEDGE); - Queue queue = new AMQQueue(new AMQShortString("Q1"), new AMQShortString("Q1"), false, true); + Queue queue = new AMQQueue(new AMQShortString("Q3"), new AMQShortString("Q3"), false, true); + MessageConsumer consumer = consumerSession.createConsumer(queue); MessageProducer producer = consumerSession.createProducer(queue); producer.send(consumerSession.createTextMessage("hello")); - MessageConsumer consumer = consumerSession.createConsumer(queue); + + + final Object lock = new Object(); + consumer.setMessageListener(new MessageListener() { - private int count = 0; + + public void onMessage(Message message) { try { - if (count++ == 0) + count.incrementAndGet(); + if (count.get() == 1) { - assertFalse(message.getJMSRedelivered()); + if(message.getJMSRedelivered()) + { + setError(new Exception("Message marked as redilvered on what should be first delivery attempt")); + } consumerSession.recover(); } - else if (count++ == 1) + else if (count.get() == 2) { - assertTrue(message.getJMSRedelivered()); + if(!message.getJMSRedelivered()) + { + setError(new Exception("Message not marked as redilvered on what should be second delivery attempt")); + } } else { - fail("Message delivered too many times!"); + System.err.println(message); + fail("Message delivered too many times!: " + count); } } catch (JMSException e) { _logger.error("Error recovering session: " + e, e); + setError(e); + } + synchronized(lock) + { + lock.notify(); } } }); + + con.start(); + + long waitTime = 300000L; + long waitUntilTime = System.currentTimeMillis() + waitTime; + + synchronized(lock) + { + while((count.get() <= 1) && (waitTime > 0)) + { + lock.wait(waitTime); + if(count.get() <= 1) + { + waitTime = waitUntilTime - System.currentTimeMillis(); + } + } + } + + Thread.sleep(1000); + + if(count.get() != 2) + { + System.err.println("Count != 2 : " + count); + } + assertTrue(count.get() == 2); + + con.close(); + + if(_error != null) + { + throw _error; + } + } + + private void setError(Exception e) + { + _error = e; } public static junit.framework.Test suite() diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/MapMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/MapMessageTest.java index 0d283aa0d9..29770704c5 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/MapMessageTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/MapMessageTest.java @@ -144,13 +144,29 @@ public class MapMessageTest extends TestCase implements MessageListener } - void waitFor(int count) throws InterruptedException + void waitFor(int count) throws Exception { + long waitTime = 30000L; + long waitUntilTime = System.currentTimeMillis() + 30000L; + + synchronized(received) { - while (received.size() < count) + while(received.size() < count && waitTime>0) + { + if (received.size() < count) + { + received.wait(waitTime); + } + + if (received.size() < count) + { + waitTime = waitUntilTime - System.currentTimeMillis(); + } + } + if (received.size() < count) { - received.wait(); + throw new Exception("Timed-out. Waiting for " + count + " only got " + received.size()); } } } diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java index 8441799990..d401690148 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java @@ -48,14 +48,15 @@ public class ConnectionTest extends TestCase protected void tearDown() throws Exception { - TransportConnection.killAllVMBrokers(); + TransportConnection.killVMBroker(1); } public void testSimpleConnection() { try { - new AMQConnection(_broker, "guest", "guest", "fred", "test"); + AMQConnection conn = new AMQConnection(_broker, "guest", "guest", "fred", "test"); + conn.close(); } catch (Exception e) { @@ -94,6 +95,7 @@ public class ConnectionTest extends TestCase fail("Correct exception not thrown. Excpected 'AMQConnectionException' got: " + amqe); } } + } public void testUnresolvedHostFailure() throws Exception diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Client.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Client.java index f12400c7b1..db0d3e0eab 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Client.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Client.java @@ -65,23 +65,29 @@ public class Client implements MessageListener _connection.close(); } - public void onMessage(Message response) + public synchronized void onMessage(Message response) { + System.out.println("Received " + (++_count) + " of " + _expected + " responses."); if(_count == _expected) { - synchronized(this) - { - notifyAll(); - } + + notifyAll(); } + + } - synchronized void waitUntilComplete() throws InterruptedException + synchronized void waitUntilComplete() throws Exception { - while(_count < _expected) + + if(_count < _expected) + { + wait(10000L); + } + if(_count < _expected) { - wait(); + throw new Exception("Didn't receive all messages... got " + _count + " expected " + _expected); } } diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/message/ObjectMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/message/ObjectMessageTest.java index 0710605db9..0e4603ed24 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/message/ObjectMessageTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/message/ObjectMessageTest.java @@ -101,6 +101,7 @@ public class ObjectMessageTest extends TestCase implements MessageListener } catch (Exception e) { + e.printStackTrace(); fail("This Test should succeed but failed due to: " + e); } finally @@ -236,7 +237,7 @@ public class ObjectMessageTest extends TestCase implements MessageListener public void onMessage(Message message) { - received++; + try { if (message instanceof ObjectMessage) @@ -255,13 +256,11 @@ public class ObjectMessageTest extends TestCase implements MessageListener items.add(e); } - if (waiting) - { synchronized(this) { + received++; notify(); } - } } diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java index 84c7a61a56..8e883a2184 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java @@ -219,6 +219,7 @@ public class TopicSessionTest extends TestCase assertNotNull(receivedMessage); assertEquals(sentMessage.getText(),receivedMessage.getText()); + conn.close(); } diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java index 9805c48db1..f8b3b28845 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java @@ -84,7 +84,7 @@ public class TransactedTest extends TestCase testCon = new AMQConnection("vm://:1", "guest", "guest", "TestConnection", "test"); testSession = testCon.createSession(false, AMQSession.NO_ACKNOWLEDGE); testConsumer2 = testSession.createConsumer(queue2); - testCon.start(); + } protected void tearDown() throws Exception @@ -108,7 +108,7 @@ public class TransactedTest extends TestCase //commit session.commit(); - + testCon.start(); //ensure sent messages can be received and received messages are gone expect("X", testConsumer2.receive(1000)); expect("Y", testConsumer2.receive(1000)); @@ -135,7 +135,7 @@ public class TransactedTest extends TestCase expect("A", consumer1.receive(1000)); expect("B", consumer1.receive(1000)); expect("C", consumer1.receive(1000)); - + testCon.start(); testConsumer1 = testSession.createConsumer(queue1); assertTrue(null == testConsumer1.receive(1000)); assertTrue(null == testConsumer2.receive(1000)); |