summaryrefslogtreecommitdiff
path: root/java/client/src/test
diff options
context:
space:
mode:
Diffstat (limited to 'java/client/src/test')
-rw-r--r--java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java2
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java78
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/basic/MapMessageTest.java22
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java6
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Client.java22
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/client/message/ObjectMessageTest.java7
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java1
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java6
8 files changed, 115 insertions, 29 deletions
diff --git a/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java b/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java
index 01c3d30314..0739acfabd 100644
--- a/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java
+++ b/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java
@@ -126,7 +126,7 @@ public class MessageListenerTest extends TestCase implements MessageListener
for (int msg = 0; msg < MSG_COUNT; msg++)
{
- assertTrue(_consumer.receive() != null);
+ assertTrue(_consumer.receive(2000) != null);
}
}
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java
index 2d69b4fb82..4a8c0145c4 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java
@@ -29,21 +29,28 @@ import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.exchange.ExchangeDefaults;
import javax.jms.*;
+import java.util.concurrent.atomic.AtomicInteger;
public class RecoverTest extends TestCase
{
private static final Logger _logger = Logger.getLogger(RecoverTest.class);
+ private Exception _error;
+ private AtomicInteger count;
+
protected void setUp() throws Exception
{
super.setUp();
TransportConnection.createVMBroker(1);
+ _error = null;
+ count = new AtomicInteger();
}
protected void tearDown() throws Exception
{
super.tearDown();
TransportConnection.killAllVMBrokers();
+ count = null;
}
@@ -212,38 +219,93 @@ public class RecoverTest extends TestCase
Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "test");
final Session consumerSession = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Queue queue = new AMQQueue(new AMQShortString("Q1"), new AMQShortString("Q1"), false, true);
+ Queue queue = new AMQQueue(new AMQShortString("Q3"), new AMQShortString("Q3"), false, true);
+ MessageConsumer consumer = consumerSession.createConsumer(queue);
MessageProducer producer = consumerSession.createProducer(queue);
producer.send(consumerSession.createTextMessage("hello"));
- MessageConsumer consumer = consumerSession.createConsumer(queue);
+
+
+ final Object lock = new Object();
+
consumer.setMessageListener(new MessageListener()
{
- private int count = 0;
+
+
public void onMessage(Message message)
{
try
{
- if (count++ == 0)
+ count.incrementAndGet();
+ if (count.get() == 1)
{
- assertFalse(message.getJMSRedelivered());
+ if(message.getJMSRedelivered())
+ {
+ setError(new Exception("Message marked as redilvered on what should be first delivery attempt"));
+ }
consumerSession.recover();
}
- else if (count++ == 1)
+ else if (count.get() == 2)
{
- assertTrue(message.getJMSRedelivered());
+ if(!message.getJMSRedelivered())
+ {
+ setError(new Exception("Message not marked as redilvered on what should be second delivery attempt"));
+ }
}
else
{
- fail("Message delivered too many times!");
+ System.err.println(message);
+ fail("Message delivered too many times!: " + count);
}
}
catch (JMSException e)
{
_logger.error("Error recovering session: " + e, e);
+ setError(e);
+ }
+ synchronized(lock)
+ {
+ lock.notify();
}
}
});
+
+ con.start();
+
+ long waitTime = 300000L;
+ long waitUntilTime = System.currentTimeMillis() + waitTime;
+
+ synchronized(lock)
+ {
+ while((count.get() <= 1) && (waitTime > 0))
+ {
+ lock.wait(waitTime);
+ if(count.get() <= 1)
+ {
+ waitTime = waitUntilTime - System.currentTimeMillis();
+ }
+ }
+ }
+
+ Thread.sleep(1000);
+
+ if(count.get() != 2)
+ {
+ System.err.println("Count != 2 : " + count);
+ }
+ assertTrue(count.get() == 2);
+
+ con.close();
+
+ if(_error != null)
+ {
+ throw _error;
+ }
+ }
+
+ private void setError(Exception e)
+ {
+ _error = e;
}
public static junit.framework.Test suite()
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/MapMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/MapMessageTest.java
index 0d283aa0d9..29770704c5 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/MapMessageTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/MapMessageTest.java
@@ -144,13 +144,29 @@ public class MapMessageTest extends TestCase implements MessageListener
}
- void waitFor(int count) throws InterruptedException
+ void waitFor(int count) throws Exception
{
+ long waitTime = 30000L;
+ long waitUntilTime = System.currentTimeMillis() + 30000L;
+
+
synchronized(received)
{
- while (received.size() < count)
+ while(received.size() < count && waitTime>0)
+ {
+ if (received.size() < count)
+ {
+ received.wait(waitTime);
+ }
+
+ if (received.size() < count)
+ {
+ waitTime = waitUntilTime - System.currentTimeMillis();
+ }
+ }
+ if (received.size() < count)
{
- received.wait();
+ throw new Exception("Timed-out. Waiting for " + count + " only got " + received.size());
}
}
}
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java
index 8441799990..d401690148 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java
@@ -48,14 +48,15 @@ public class ConnectionTest extends TestCase
protected void tearDown() throws Exception
{
- TransportConnection.killAllVMBrokers();
+ TransportConnection.killVMBroker(1);
}
public void testSimpleConnection()
{
try
{
- new AMQConnection(_broker, "guest", "guest", "fred", "test");
+ AMQConnection conn = new AMQConnection(_broker, "guest", "guest", "fred", "test");
+ conn.close();
}
catch (Exception e)
{
@@ -94,6 +95,7 @@ public class ConnectionTest extends TestCase
fail("Correct exception not thrown. Excpected 'AMQConnectionException' got: " + amqe);
}
}
+
}
public void testUnresolvedHostFailure() throws Exception
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Client.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Client.java
index f12400c7b1..db0d3e0eab 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Client.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Client.java
@@ -65,23 +65,29 @@ public class Client implements MessageListener
_connection.close();
}
- public void onMessage(Message response)
+ public synchronized void onMessage(Message response)
{
+
System.out.println("Received " + (++_count) + " of " + _expected + " responses.");
if(_count == _expected)
{
- synchronized(this)
- {
- notifyAll();
- }
+
+ notifyAll();
}
+
+
}
- synchronized void waitUntilComplete() throws InterruptedException
+ synchronized void waitUntilComplete() throws Exception
{
- while(_count < _expected)
+
+ if(_count < _expected)
+ {
+ wait(10000L);
+ }
+ if(_count < _expected)
{
- wait();
+ throw new Exception("Didn't receive all messages... got " + _count + " expected " + _expected);
}
}
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/message/ObjectMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/message/ObjectMessageTest.java
index 0710605db9..0e4603ed24 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/client/message/ObjectMessageTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/message/ObjectMessageTest.java
@@ -101,6 +101,7 @@ public class ObjectMessageTest extends TestCase implements MessageListener
}
catch (Exception e)
{
+ e.printStackTrace();
fail("This Test should succeed but failed due to: " + e);
}
finally
@@ -236,7 +237,7 @@ public class ObjectMessageTest extends TestCase implements MessageListener
public void onMessage(Message message)
{
- received++;
+
try
{
if (message instanceof ObjectMessage)
@@ -255,13 +256,11 @@ public class ObjectMessageTest extends TestCase implements MessageListener
items.add(e);
}
- if (waiting)
- {
synchronized(this)
{
+ received++;
notify();
}
- }
}
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java
index 84c7a61a56..8e883a2184 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java
@@ -219,6 +219,7 @@ public class TopicSessionTest extends TestCase
assertNotNull(receivedMessage);
assertEquals(sentMessage.getText(),receivedMessage.getText());
+ conn.close();
}
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java
index 9805c48db1..f8b3b28845 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java
@@ -84,7 +84,7 @@ public class TransactedTest extends TestCase
testCon = new AMQConnection("vm://:1", "guest", "guest", "TestConnection", "test");
testSession = testCon.createSession(false, AMQSession.NO_ACKNOWLEDGE);
testConsumer2 = testSession.createConsumer(queue2);
- testCon.start();
+
}
protected void tearDown() throws Exception
@@ -108,7 +108,7 @@ public class TransactedTest extends TestCase
//commit
session.commit();
-
+ testCon.start();
//ensure sent messages can be received and received messages are gone
expect("X", testConsumer2.receive(1000));
expect("Y", testConsumer2.receive(1000));
@@ -135,7 +135,7 @@ public class TransactedTest extends TestCase
expect("A", consumer1.receive(1000));
expect("B", consumer1.receive(1000));
expect("C", consumer1.receive(1000));
-
+ testCon.start();
testConsumer1 = testSession.createConsumer(queue1);
assertTrue(null == testConsumer1.receive(1000));
assertTrue(null == testConsumer2.receive(1000));