From e9b31c99401eef29ece619929f2ad269e6f6c2a9 Mon Sep 17 00:00:00 2001 From: Robert Greig Date: Mon, 29 Jan 2007 11:07:25 +0000 Subject: QPID-322 : Patch supplied by Rob Godfrey - Tests may hang instead of fail if message does not get through git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@501007 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/apache/qpid/codec/BasicDeliverTest.java | 10 +-- .../java/org/apache/qpid/codec/Client.java | 4 +- .../java/org/apache/qpid/headers/Listener.java | 8 +-- .../java/org/apache/qpid/headers/Publisher.java | 8 +-- .../referenceabletest/JNDIReferenceableTest.java | 5 +- .../apache/qpid/client/MessageListenerTest.java | 2 +- .../org/apache/qpid/test/unit/ack/RecoverTest.java | 78 +++++++++++++++++++--- .../qpid/test/unit/basic/MapMessageTest.java | 22 +++++- .../unit/client/connection/ConnectionTest.java | 6 +- .../qpid/test/unit/client/forwardall/Client.java | 22 +++--- .../unit/client/message/ObjectMessageTest.java | 7 +- .../qpid/test/unit/topic/TopicSessionTest.java | 1 + .../qpid/test/unit/transacted/TransactedTest.java | 6 +- 13 files changed, 133 insertions(+), 46 deletions(-) diff --git a/qpid/java/client/src/old_test/java/org/apache/qpid/codec/BasicDeliverTest.java b/qpid/java/client/src/old_test/java/org/apache/qpid/codec/BasicDeliverTest.java index ad0a6bddae..1db7e200bd 100644 --- a/qpid/java/client/src/old_test/java/org/apache/qpid/codec/BasicDeliverTest.java +++ b/qpid/java/client/src/old_test/java/org/apache/qpid/codec/BasicDeliverTest.java @@ -240,10 +240,7 @@ public class BasicDeliverTest static AMQFrame wrapBody(AMQBody body) { - AMQFrame frame = new AMQFrame(); - frame.bodyFrame = body; - frame.channel = 1; - + AMQFrame frame = new AMQFrame(1, body); return frame; } @@ -269,7 +266,10 @@ public class BasicDeliverTest static BasicDeliverBody createBasicDeliverBody() { - BasicDeliverBody body = new BasicDeliverBody((byte) 8, (byte) 0, new AMQShortString("myConsumerTag"), 1, + BasicDeliverBody body = new BasicDeliverBody((byte) 8, (byte) 0, + BasicDeliverBody.getClazz((byte) 8, (byte) 0), + BasicDeliverBody.getMethod((byte) 8, (byte) 0), + new AMQShortString("myConsumerTag"), 1, new AMQShortString("myExchange"), false, new AMQShortString("myRoutingKey")); return body; diff --git a/qpid/java/client/src/old_test/java/org/apache/qpid/codec/Client.java b/qpid/java/client/src/old_test/java/org/apache/qpid/codec/Client.java index c0de5ab133..3886021277 100644 --- a/qpid/java/client/src/old_test/java/org/apache/qpid/codec/Client.java +++ b/qpid/java/client/src/old_test/java/org/apache/qpid/codec/Client.java @@ -106,12 +106,12 @@ public class Client extends IoHandlerAdapter private static boolean isDeliver(Object o) { - return o instanceof AMQFrame && ((AMQFrame) o).bodyFrame instanceof BasicDeliverBody; + return o instanceof AMQFrame && ((AMQFrame) o).getBodyFrame() instanceof BasicDeliverBody; } private static boolean isContent(Object o) { - return o instanceof AMQFrame && ((AMQFrame) o).bodyFrame instanceof ContentBody; + return o instanceof AMQFrame && ((AMQFrame) o).getBodyFrame() instanceof ContentBody; } public static void main(String[] argv) throws Exception diff --git a/qpid/java/client/src/old_test/java/org/apache/qpid/headers/Listener.java b/qpid/java/client/src/old_test/java/org/apache/qpid/headers/Listener.java index d97fc22a35..cb5caefc1e 100644 --- a/qpid/java/client/src/old_test/java/org/apache/qpid/headers/Listener.java +++ b/qpid/java/client/src/old_test/java/org/apache/qpid/headers/Listener.java @@ -23,7 +23,7 @@ package org.apache.qpid.headers; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQSession; import org.apache.qpid.jms.Session; -import org.apache.qpid.testutil.Config; +//import org.apache.qpid.testutil.Config; import javax.jms.MessageListener; import javax.jms.Message; @@ -31,9 +31,9 @@ import javax.jms.Destination; import javax.jms.MessageProducer; import javax.jms.JMSException; -public class Listener implements MessageListener +public class Listener //implements MessageListener { - private final AMQConnection _connection; +/* private final AMQConnection _connection; private final MessageProducer _controller; private final AMQSession _session; private final MessageFactory _factory; @@ -113,5 +113,5 @@ public class Listener implements MessageListener config.setName("test_headers_exchange"); config.setOptions(argv); new Listener((AMQConnection) config.getConnection(), config.getDestination()); - } + }*/ } diff --git a/qpid/java/client/src/old_test/java/org/apache/qpid/headers/Publisher.java b/qpid/java/client/src/old_test/java/org/apache/qpid/headers/Publisher.java index a4ac5f670d..d9ef702c48 100644 --- a/qpid/java/client/src/old_test/java/org/apache/qpid/headers/Publisher.java +++ b/qpid/java/client/src/old_test/java/org/apache/qpid/headers/Publisher.java @@ -22,13 +22,13 @@ package org.apache.qpid.headers; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQSession; -import org.apache.qpid.testutil.Config; +//import org.apache.qpid.testutil.Config; import javax.jms.*; -public class Publisher implements MessageListener +public class Publisher // implements MessageListener { - private final Object _lock = new Object(); +/* private final Object _lock = new Object(); private final AMQConnection _connection; private final AMQSession _session; private final Destination _exchange; @@ -129,5 +129,5 @@ public class Publisher implements MessageListener new Publisher(config).test(msgCount, consumerCount); } - } + }*/ } diff --git a/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/referenceabletest/JNDIReferenceableTest.java b/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/referenceabletest/JNDIReferenceableTest.java index 8272b13c1d..9fc186f19a 100644 --- a/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/referenceabletest/JNDIReferenceableTest.java +++ b/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/referenceabletest/JNDIReferenceableTest.java @@ -21,7 +21,7 @@ package org.apache.qpid.test.unit.jndi.referenceabletest; import junit.framework.TestCase; -import org.apache.qpid.testutil.VMBrokerSetup; +//import org.apache.qpid.testutil.VMBrokerSetup; import javax.naming.NameAlreadyBoundException; import javax.naming.NoInitialContextException; @@ -38,7 +38,7 @@ import javax.naming.NoInitialContextException; */ public class JNDIReferenceableTest extends TestCase { - // FIXME FSContext has been removed from repository. This needs redone with the PropertiesFileInitialContextFactory. QPID-84 +/* // FIXME FSContext has been removed from repository. This needs redone with the PropertiesFileInitialContextFactory. QPID-84 public void testReferenceable() { Bind b = null; @@ -98,4 +98,5 @@ public class JNDIReferenceableTest extends TestCase { return new VMBrokerSetup(new junit.framework.TestSuite(JNDIReferenceableTest.class)); } + */ } diff --git a/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java b/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java index 01c3d30314..0739acfabd 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java @@ -126,7 +126,7 @@ public class MessageListenerTest extends TestCase implements MessageListener for (int msg = 0; msg < MSG_COUNT; msg++) { - assertTrue(_consumer.receive() != null); + assertTrue(_consumer.receive(2000) != null); } } diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java index 2d69b4fb82..4a8c0145c4 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java @@ -29,21 +29,28 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.exchange.ExchangeDefaults; import javax.jms.*; +import java.util.concurrent.atomic.AtomicInteger; public class RecoverTest extends TestCase { private static final Logger _logger = Logger.getLogger(RecoverTest.class); + private Exception _error; + private AtomicInteger count; + protected void setUp() throws Exception { super.setUp(); TransportConnection.createVMBroker(1); + _error = null; + count = new AtomicInteger(); } protected void tearDown() throws Exception { super.tearDown(); TransportConnection.killAllVMBrokers(); + count = null; } @@ -212,38 +219,93 @@ public class RecoverTest extends TestCase Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "test"); final Session consumerSession = con.createSession(false, Session.AUTO_ACKNOWLEDGE); - Queue queue = new AMQQueue(new AMQShortString("Q1"), new AMQShortString("Q1"), false, true); + Queue queue = new AMQQueue(new AMQShortString("Q3"), new AMQShortString("Q3"), false, true); + MessageConsumer consumer = consumerSession.createConsumer(queue); MessageProducer producer = consumerSession.createProducer(queue); producer.send(consumerSession.createTextMessage("hello")); - MessageConsumer consumer = consumerSession.createConsumer(queue); + + + final Object lock = new Object(); + consumer.setMessageListener(new MessageListener() { - private int count = 0; + + public void onMessage(Message message) { try { - if (count++ == 0) + count.incrementAndGet(); + if (count.get() == 1) { - assertFalse(message.getJMSRedelivered()); + if(message.getJMSRedelivered()) + { + setError(new Exception("Message marked as redilvered on what should be first delivery attempt")); + } consumerSession.recover(); } - else if (count++ == 1) + else if (count.get() == 2) { - assertTrue(message.getJMSRedelivered()); + if(!message.getJMSRedelivered()) + { + setError(new Exception("Message not marked as redilvered on what should be second delivery attempt")); + } } else { - fail("Message delivered too many times!"); + System.err.println(message); + fail("Message delivered too many times!: " + count); } } catch (JMSException e) { _logger.error("Error recovering session: " + e, e); + setError(e); + } + synchronized(lock) + { + lock.notify(); } } }); + + con.start(); + + long waitTime = 300000L; + long waitUntilTime = System.currentTimeMillis() + waitTime; + + synchronized(lock) + { + while((count.get() <= 1) && (waitTime > 0)) + { + lock.wait(waitTime); + if(count.get() <= 1) + { + waitTime = waitUntilTime - System.currentTimeMillis(); + } + } + } + + Thread.sleep(1000); + + if(count.get() != 2) + { + System.err.println("Count != 2 : " + count); + } + assertTrue(count.get() == 2); + + con.close(); + + if(_error != null) + { + throw _error; + } + } + + private void setError(Exception e) + { + _error = e; } public static junit.framework.Test suite() diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/MapMessageTest.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/MapMessageTest.java index 0d283aa0d9..29770704c5 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/MapMessageTest.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/MapMessageTest.java @@ -144,13 +144,29 @@ public class MapMessageTest extends TestCase implements MessageListener } - void waitFor(int count) throws InterruptedException + void waitFor(int count) throws Exception { + long waitTime = 30000L; + long waitUntilTime = System.currentTimeMillis() + 30000L; + + synchronized(received) { - while (received.size() < count) + while(received.size() < count && waitTime>0) + { + if (received.size() < count) + { + received.wait(waitTime); + } + + if (received.size() < count) + { + waitTime = waitUntilTime - System.currentTimeMillis(); + } + } + if (received.size() < count) { - received.wait(); + throw new Exception("Timed-out. Waiting for " + count + " only got " + received.size()); } } } diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java index 8441799990..d401690148 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java @@ -48,14 +48,15 @@ public class ConnectionTest extends TestCase protected void tearDown() throws Exception { - TransportConnection.killAllVMBrokers(); + TransportConnection.killVMBroker(1); } public void testSimpleConnection() { try { - new AMQConnection(_broker, "guest", "guest", "fred", "test"); + AMQConnection conn = new AMQConnection(_broker, "guest", "guest", "fred", "test"); + conn.close(); } catch (Exception e) { @@ -94,6 +95,7 @@ public class ConnectionTest extends TestCase fail("Correct exception not thrown. Excpected 'AMQConnectionException' got: " + amqe); } } + } public void testUnresolvedHostFailure() throws Exception diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Client.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Client.java index f12400c7b1..db0d3e0eab 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Client.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Client.java @@ -65,23 +65,29 @@ public class Client implements MessageListener _connection.close(); } - public void onMessage(Message response) + public synchronized void onMessage(Message response) { + System.out.println("Received " + (++_count) + " of " + _expected + " responses."); if(_count == _expected) { - synchronized(this) - { - notifyAll(); - } + + notifyAll(); } + + } - synchronized void waitUntilComplete() throws InterruptedException + synchronized void waitUntilComplete() throws Exception { - while(_count < _expected) + + if(_count < _expected) + { + wait(10000L); + } + if(_count < _expected) { - wait(); + throw new Exception("Didn't receive all messages... got " + _count + " expected " + _expected); } } diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/message/ObjectMessageTest.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/message/ObjectMessageTest.java index 0710605db9..0e4603ed24 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/message/ObjectMessageTest.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/message/ObjectMessageTest.java @@ -101,6 +101,7 @@ public class ObjectMessageTest extends TestCase implements MessageListener } catch (Exception e) { + e.printStackTrace(); fail("This Test should succeed but failed due to: " + e); } finally @@ -236,7 +237,7 @@ public class ObjectMessageTest extends TestCase implements MessageListener public void onMessage(Message message) { - received++; + try { if (message instanceof ObjectMessage) @@ -255,13 +256,11 @@ public class ObjectMessageTest extends TestCase implements MessageListener items.add(e); } - if (waiting) - { synchronized(this) { + received++; notify(); } - } } diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java index 84c7a61a56..8e883a2184 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java @@ -219,6 +219,7 @@ public class TopicSessionTest extends TestCase assertNotNull(receivedMessage); assertEquals(sentMessage.getText(),receivedMessage.getText()); + conn.close(); } diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java index 9805c48db1..f8b3b28845 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java @@ -84,7 +84,7 @@ public class TransactedTest extends TestCase testCon = new AMQConnection("vm://:1", "guest", "guest", "TestConnection", "test"); testSession = testCon.createSession(false, AMQSession.NO_ACKNOWLEDGE); testConsumer2 = testSession.createConsumer(queue2); - testCon.start(); + } protected void tearDown() throws Exception @@ -108,7 +108,7 @@ public class TransactedTest extends TestCase //commit session.commit(); - + testCon.start(); //ensure sent messages can be received and received messages are gone expect("X", testConsumer2.receive(1000)); expect("Y", testConsumer2.receive(1000)); @@ -135,7 +135,7 @@ public class TransactedTest extends TestCase expect("A", consumer1.receive(1000)); expect("B", consumer1.receive(1000)); expect("C", consumer1.receive(1000)); - + testCon.start(); testConsumer1 = testSession.createConsumer(queue1); assertTrue(null == testConsumer1.receive(1000)); assertTrue(null == testConsumer2.receive(1000)); -- cgit v1.2.1