summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--java/client/test/src/org/apache/qpid/ack/DisconnectAndRedeliverTest.java54
-rw-r--r--java/client/test/src/org/apache/qpid/ack/RecoverTest.java22
-rw-r--r--java/client/test/src/org/apache/qpid/basic/BytesMessageTest.java10
-rw-r--r--java/client/test/src/org/apache/qpid/basic/FieldTableMessageTest.java8
-rw-r--r--java/client/test/src/org/apache/qpid/basic/MultipleConnectionTest.java11
-rw-r--r--java/client/test/src/org/apache/qpid/basic/ObjectMessageTest.java8
-rw-r--r--java/client/test/src/org/apache/qpid/basic/ReceiveTest.java17
-rw-r--r--java/client/test/src/org/apache/qpid/basic/SessionStartTest.java14
-rw-r--r--java/client/test/src/org/apache/qpid/basic/TextMessageTest.java9
-rw-r--r--java/client/test/src/org/apache/qpid/client/channelclose/ChannelCloseOkTest.java6
-rw-r--r--java/client/test/src/org/apache/qpid/client/message/ObjectMessageTest.java7
-rw-r--r--java/client/test/src/org/apache/qpid/client/testutil/VmOrRemoteTestCase.java56
-rw-r--r--java/client/test/src/org/apache/qpid/cluster/Client.java4
-rw-r--r--java/client/test/src/org/apache/qpid/connection/TestManyConnections.java5
-rw-r--r--java/client/test/src/org/apache/qpid/failover/FailoverBrokerTester.java270
-rw-r--r--java/client/test/src/org/apache/qpid/failover/FailoverTxTest.java55
-rw-r--r--java/client/test/src/org/apache/qpid/forwardall/Combined.java13
-rw-r--r--java/client/test/src/org/apache/qpid/requestreply1/VmRequestReply.java32
-rw-r--r--java/client/test/src/org/apache/qpid/topic/DurableSubscriptionTest.java32
-rw-r--r--java/client/test/src/org/apache/qpid/transacted/TransactedTest.java24
-rw-r--r--java/client/test/src/org/apache/qpid/transport/VmPipeTransportConnection.java59
-rw-r--r--java/client/test/src/org/apache/qpid/vmbroker/VmPipeBroker.java76
22 files changed, 509 insertions, 283 deletions
diff --git a/java/client/test/src/org/apache/qpid/ack/DisconnectAndRedeliverTest.java b/java/client/test/src/org/apache/qpid/ack/DisconnectAndRedeliverTest.java
index 254b3d43ae..1dccf2f336 100644
--- a/java/client/test/src/org/apache/qpid/ack/DisconnectAndRedeliverTest.java
+++ b/java/client/test/src/org/apache/qpid/ack/DisconnectAndRedeliverTest.java
@@ -20,17 +20,20 @@ package org.apache.qpid.ack;
import junit.framework.JUnit4TestAdapter;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQQueue;
-import org.apache.qpid.client.testutil.VmOrRemoteTestCase;
+import org.apache.qpid.client.transport.TransportConnection;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
+import org.apache.qpid.vmbroker.AMQVMBrokerCreationException;
import org.apache.log4j.Logger;
import org.apache.log4j.xml.DOMConfigurator;
import org.junit.Assert;
import org.junit.Test;
+import org.junit.After;
+import org.junit.Before;
import javax.jms.*;
-public class DisconnectAndRedeliverTest extends VmOrRemoteTestCase
+public class DisconnectAndRedeliverTest
{
private static final Logger _logger = Logger.getLogger(DisconnectAndRedeliverTest.class);
@@ -40,21 +43,30 @@ public class DisconnectAndRedeliverTest extends VmOrRemoteTestCase
DOMConfigurator.configure("broker/etc/log4j.xml");
}
+ @After
+ public void stopVmBroker()
+ {
+ TransportConnection.killVMBroker(1);
+ }
+
/**
* This tests that when there are unacknowledged messages on a channel they are requeued for delivery when
* the channel is closed.
+ *
* @throws Exception
*/
@Test
public void disconnectRedeliversMessages() throws Exception
{
- Connection con = new AMQConnection("foo", 1, "guest", "guest", "consumer1", "/test");
+ Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
+
+ TestableMemoryMessageStore store = (TestableMemoryMessageStore) ApplicationRegistry.getInstance().getMessageStore();
Session consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
AMQQueue queue = new AMQQueue("someQ", "someQ", false, false);
MessageConsumer consumer = consumerSession.createConsumer(queue);
- Connection con2 = new AMQConnection("bar", 2, "guest", "guest", "producer1", "/test");
+ Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test");
Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
MessageProducer producer = producerSession.createProducer(queue);
@@ -64,6 +76,8 @@ public class DisconnectAndRedeliverTest extends VmOrRemoteTestCase
producer.send(producerSession.createTextMessage("msg3"));
producer.send(producerSession.createTextMessage("msg4"));
+ con2.close();
+
_logger.info("Starting connection");
con.start();
TextMessage tm = (TextMessage) consumer.receive();
@@ -75,7 +89,7 @@ public class DisconnectAndRedeliverTest extends VmOrRemoteTestCase
_logger.info("Received all four messages. About to disconnect and reconnect");
con.close();
- con = new AMQConnection("foo", 1, "guest", "guest", "consumer1", "/test");
+ con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
consumer = consumerSession.createConsumer(queue);
@@ -85,9 +99,11 @@ public class DisconnectAndRedeliverTest extends VmOrRemoteTestCase
tm = (TextMessage) consumer.receive(3000);
Assert.assertEquals(tm.getText(), "msg2");
+
tm = (TextMessage) consumer.receive(3000);
Assert.assertEquals(tm.getText(), "msg3");
+
tm = (TextMessage) consumer.receive(3000);
Assert.assertEquals(tm.getText(), "msg4");
@@ -96,7 +112,7 @@ public class DisconnectAndRedeliverTest extends VmOrRemoteTestCase
con.close();
- con = new AMQConnection("foo", 1, "guest", "guest", "consumer1", "/test");
+ con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
consumer = consumerSession.createConsumer(queue);
_logger.info("Starting third consumer connection");
@@ -105,23 +121,38 @@ public class DisconnectAndRedeliverTest extends VmOrRemoteTestCase
Assert.assertNull(tm);
_logger.info("No messages redelivered as is expected");
con.close();
+
+ con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
+ consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ consumer = consumerSession.createConsumer(queue);
+ _logger.info("Starting fourth consumer connection");
+ con.start();
+ tm = (TextMessage) consumer.receive(3000);
+ Assert.assertNull(tm);
+ _logger.info("No messages redelivered as is expected");
+ con.close();
+
+ _logger.info("Actually:" + store.getMessageMap().size());
+ // Assert.assertTrue(store.getMessageMap().size() == 0);
}
/**
* Tests that unacknowledged messages are thrown away when the channel is closed and they cannot be
* requeued (due perhaps to the queue being deleted).
+ *
* @throws Exception
*/
@Test
public void disconnectWithTransientQueueThrowsAwayMessages() throws Exception
{
- Connection con = new AMQConnection("foo", 1, "guest", "guest", "consumer1", "/test");
+ Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
+ TestableMemoryMessageStore store = (TestableMemoryMessageStore) ApplicationRegistry.getInstance().getMessageStore();
Session consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Queue queue = new AMQQueue("someQ", "someQ", false, true);
MessageConsumer consumer = consumerSession.createConsumer(queue);
- Connection con2 = new AMQConnection("bar", 2, "guest", "guest", "producer1", "/test");
+ Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test");
Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
MessageProducer producer = producerSession.createProducer(queue);
@@ -131,6 +162,8 @@ public class DisconnectAndRedeliverTest extends VmOrRemoteTestCase
producer.send(producerSession.createTextMessage("msg3"));
producer.send(producerSession.createTextMessage("msg4"));
+ con2.close();
+
_logger.info("Starting connection");
con.start();
TextMessage tm = (TextMessage) consumer.receive();
@@ -142,7 +175,7 @@ public class DisconnectAndRedeliverTest extends VmOrRemoteTestCase
_logger.info("Received all four messages. About to disconnect and reconnect");
con.close();
- con = new AMQConnection("foo", 1, "guest", "guest", "consumer1", "/test");
+ con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
consumer = consumerSession.createConsumer(queue);
@@ -152,7 +185,8 @@ public class DisconnectAndRedeliverTest extends VmOrRemoteTestCase
tm = (TextMessage) consumer.receiveNoWait();
Assert.assertNull(tm);
_logger.info("No messages redelivered as is expected");
- TestableMemoryMessageStore store = (TestableMemoryMessageStore) ApplicationRegistry.getInstance().getMessageStore();
+
+ _logger.info("Actually:" + store.getMessageMap().size());
Assert.assertTrue(store.getMessageMap().size() == 0);
con.close();
}
diff --git a/java/client/test/src/org/apache/qpid/ack/RecoverTest.java b/java/client/test/src/org/apache/qpid/ack/RecoverTest.java
index 0ab40aa67c..cd7c43d4a1 100644
--- a/java/client/test/src/org/apache/qpid/ack/RecoverTest.java
+++ b/java/client/test/src/org/apache/qpid/ack/RecoverTest.java
@@ -20,15 +20,18 @@ package org.apache.qpid.ack;
import junit.framework.JUnit4TestAdapter;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQQueue;
-import org.apache.qpid.client.testutil.VmOrRemoteTestCase;
+import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.vmbroker.AMQVMBrokerCreationException;
import org.apache.log4j.Logger;
import org.apache.log4j.xml.DOMConfigurator;
import org.junit.Assert;
import org.junit.Test;
+import org.junit.Before;
+import org.junit.After;
import javax.jms.*;
-public class RecoverTest extends VmOrRemoteTestCase
+public class RecoverTest
{
private static final Logger _logger = Logger.getLogger(RecoverTest.class);
@@ -38,16 +41,22 @@ public class RecoverTest extends VmOrRemoteTestCase
DOMConfigurator.configure("broker/etc/log4j.xml");
}
+ @After
+ public void stopVmBroker()
+ {
+ TransportConnection.killVMBroker(1);
+ }
+
@Test
public void recoverResendsMsgs() throws Exception
{
- Connection con = new AMQConnection("foo", 1, "guest", "guest", "consumer1", "/test");
+ Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
Session consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Queue queue = new AMQQueue("someQ", "someQ", false, true);
MessageConsumer consumer = consumerSession.createConsumer(queue);
- Connection con2 = new AMQConnection("bar", 2, "guest", "guest", "producer1", "/test");
+ Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test");
Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
MessageProducer producer = producerSession.createProducer(queue);
@@ -57,6 +66,8 @@ public class RecoverTest extends VmOrRemoteTestCase
producer.send(producerSession.createTextMessage("msg3"));
producer.send(producerSession.createTextMessage("msg4"));
+ con2.close();
+
_logger.info("Starting connection");
con.start();
TextMessage tm = (TextMessage) consumer.receive();
@@ -87,6 +98,9 @@ public class RecoverTest extends VmOrRemoteTestCase
tm = (TextMessage) consumer.receiveNoWait();
Assert.assertNull(tm);
_logger.info("No messages redelivered as is expected");
+
+ con.close();
+
}
public static junit.framework.Test suite()
diff --git a/java/client/test/src/org/apache/qpid/basic/BytesMessageTest.java b/java/client/test/src/org/apache/qpid/basic/BytesMessageTest.java
index a5cda66982..e8a9debe1d 100644
--- a/java/client/test/src/org/apache/qpid/basic/BytesMessageTest.java
+++ b/java/client/test/src/org/apache/qpid/basic/BytesMessageTest.java
@@ -23,7 +23,6 @@ import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.message.JMSBytesMessage;
-import org.apache.qpid.client.testutil.VmOrRemoteTestCase;
import org.apache.mina.common.ByteBuffer;
import org.junit.Test;
@@ -32,7 +31,7 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
-public class BytesMessageTest extends VmOrRemoteTestCase implements MessageListener
+public class BytesMessageTest implements MessageListener
{
private Connection _connection;
private Destination _destination;
@@ -40,10 +39,11 @@ public class BytesMessageTest extends VmOrRemoteTestCase implements MessageListe
private final List<JMSBytesMessage> received = new ArrayList<JMSBytesMessage>();
private final List<byte[]> messages = new ArrayList<byte[]>();
private int _count = 100;
+ public String _connectionString = "vm://:1";
void init() throws Exception
{
- init(new AMQConnection(getConnectionString(), "guest", "guest", randomize("Client"), "/test_path"));
+ init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "/test_path"));
}
void init(AMQConnection connection) throws Exception
@@ -182,7 +182,7 @@ public class BytesMessageTest extends VmOrRemoteTestCase implements MessageListe
final int count;
if (argv.length == 0)
{
- connectionString = "localhost:5672";
+ connectionString = "vm://:1";
count = 100;
}
else
@@ -195,7 +195,7 @@ public class BytesMessageTest extends VmOrRemoteTestCase implements MessageListe
System.out.println("count = " + count);
BytesMessageTest test = new BytesMessageTest();
- test.setConnectionString(connectionString);
+ test._connectionString = connectionString;
test._count = count;
test.test();
}
diff --git a/java/client/test/src/org/apache/qpid/basic/FieldTableMessageTest.java b/java/client/test/src/org/apache/qpid/basic/FieldTableMessageTest.java
index 6417544d01..82e43773a4 100644
--- a/java/client/test/src/org/apache/qpid/basic/FieldTableMessageTest.java
+++ b/java/client/test/src/org/apache/qpid/basic/FieldTableMessageTest.java
@@ -23,7 +23,6 @@ import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.message.JMSBytesMessage;
-import org.apache.qpid.client.testutil.VmOrRemoteTestCase;
import org.apache.qpid.framing.AMQFrameDecodingException;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.FieldTableTest;
@@ -36,7 +35,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Enumeration;
-public class FieldTableMessageTest extends VmOrRemoteTestCase implements MessageListener
+public class FieldTableMessageTest implements MessageListener
{
private AMQConnection _connection;
private AMQDestination _destination;
@@ -44,11 +43,12 @@ public class FieldTableMessageTest extends VmOrRemoteTestCase implements Message
private final ArrayList<JMSBytesMessage> received = new ArrayList<JMSBytesMessage>();
private FieldTable _expected;
private int _count = 10;
+ public String _connectionString = "vm://:1";
@Before
public void init() throws Exception
{
- init(new AMQConnection(getConnectionString(), "guest", "guest", randomize("Client"), "/test_path"));
+ init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "/test_path"));
}
private void init(AMQConnection connection) throws Exception
@@ -143,7 +143,7 @@ public class FieldTableMessageTest extends VmOrRemoteTestCase implements Message
public static void main(String[] argv) throws Exception
{
FieldTableMessageTest test = new FieldTableMessageTest();
- test.setConnectionString((argv.length == 0 ? "localhost:5672" : argv[0]));
+ test._connectionString = argv.length == 0 ? "vm://:1" : argv[0];
test.init();
test._count = argv.length > 1 ? Integer.parseInt(argv[1]) : 5;
test.test();
diff --git a/java/client/test/src/org/apache/qpid/basic/MultipleConnectionTest.java b/java/client/test/src/org/apache/qpid/basic/MultipleConnectionTest.java
index 28ec516243..8bf6fc1991 100644
--- a/java/client/test/src/org/apache/qpid/basic/MultipleConnectionTest.java
+++ b/java/client/test/src/org/apache/qpid/basic/MultipleConnectionTest.java
@@ -22,7 +22,6 @@ import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.AMQTopic;
-import org.apache.qpid.client.testutil.VmOrRemoteTestCase;
import org.junit.Test;
import javax.jms.*;
@@ -30,8 +29,10 @@ import javax.jms.*;
/**
* This is a slow test.
*/
-public class MultipleConnectionTest extends VmOrRemoteTestCase
+public class MultipleConnectionTest
{
+ public static String _connectionString="vm://:1";
+
private static class Receiver
{
private AMQConnection _connection;
@@ -158,20 +159,20 @@ public class MultipleConnectionTest extends VmOrRemoteTestCase
public static void main(String[] argv) throws Exception
{
- String broker = argv.length > 0 ? argv[0] : "localhost:5672";
+ String broker = argv.length > 0 ? argv[0] : "vm://:1";
int connections = 7;
int sessions = 2;
MultipleConnectionTest test = new MultipleConnectionTest();
- test.setConnectionString(broker);
+ test._connectionString = broker;
test.test();
}
@Test
public void test() throws Exception
{
- String broker = getConnectionString();
+ String broker = _connectionString;
int messages = 10;
AMQTopic topic = new AMQTopic("amq.topic");
diff --git a/java/client/test/src/org/apache/qpid/basic/ObjectMessageTest.java b/java/client/test/src/org/apache/qpid/basic/ObjectMessageTest.java
index d4e1315ccb..a78cd6f72b 100644
--- a/java/client/test/src/org/apache/qpid/basic/ObjectMessageTest.java
+++ b/java/client/test/src/org/apache/qpid/basic/ObjectMessageTest.java
@@ -23,7 +23,6 @@ import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.message.JMSObjectMessage;
-import org.apache.qpid.client.testutil.VmOrRemoteTestCase;
import org.junit.Before;
import org.junit.Test;
@@ -36,7 +35,7 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
-public class ObjectMessageTest extends VmOrRemoteTestCase implements MessageListener
+public class ObjectMessageTest implements MessageListener
{
private AMQConnection _connection;
private AMQDestination _destination;
@@ -44,11 +43,12 @@ public class ObjectMessageTest extends VmOrRemoteTestCase implements MessageList
private final List<JMSObjectMessage> received = new ArrayList<JMSObjectMessage>();
private final List<Payload> messages = new ArrayList<Payload>();
private int _count = 100;
+ public String _connectionString = "vm://:1";
@Before
public void init() throws Exception
{
- String broker = getConnectionString();
+ String broker = _connectionString;
init(new AMQConnection(broker, "guest", "guest", randomize("Client"), "/test_path"));
}
@@ -190,7 +190,7 @@ public class ObjectMessageTest extends VmOrRemoteTestCase implements MessageList
public static void main(String[] argv) throws Exception
{
ObjectMessageTest test = new ObjectMessageTest();
- test.setConnectionString(argv.length == 0 ? "localhost:5672" : argv[0]);
+ test._connectionString = argv.length == 0 ? "vm://:1" : argv[0];
test.init();
if (argv.length > 1)
{
diff --git a/java/client/test/src/org/apache/qpid/basic/ReceiveTest.java b/java/client/test/src/org/apache/qpid/basic/ReceiveTest.java
index e1ef686d32..5b7db2b77c 100644
--- a/java/client/test/src/org/apache/qpid/basic/ReceiveTest.java
+++ b/java/client/test/src/org/apache/qpid/basic/ReceiveTest.java
@@ -22,30 +22,25 @@ import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.client.message.JMSTextMessage;
-import org.apache.qpid.client.testutil.VmOrRemoteTestCase;
+
import org.junit.Before;
import org.junit.Test;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageListener;
import javax.jms.MessageConsumer;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-public class ReceiveTest extends VmOrRemoteTestCase
+public class ReceiveTest
{
private AMQConnection _connection;
private AMQDestination _destination;
private AMQSession _session;
private MessageConsumer _consumer;
+ public String _connectionString = "vm://:1";
+
@Before
public void init() throws Exception
{
- String broker = getConnectionString();
+ String broker = _connectionString;
init(new AMQConnection(broker, "guest", "guest", "ReceiveTestClient", "/test_path"));
}
@@ -73,7 +68,7 @@ public class ReceiveTest extends VmOrRemoteTestCase
public static void main(String[] argv) throws Exception
{
ReceiveTest test = new ReceiveTest();
- test.setConnectionString(argv.length == 0 ? "localhost:5672" : argv[0]);
+ test._connectionString = argv.length == 0 ? "vm://:1" : argv[0];
test.init();
test.test();
}
diff --git a/java/client/test/src/org/apache/qpid/basic/SessionStartTest.java b/java/client/test/src/org/apache/qpid/basic/SessionStartTest.java
index e9ce8ab7c3..a5f7679ffd 100644
--- a/java/client/test/src/org/apache/qpid/basic/SessionStartTest.java
+++ b/java/client/test/src/org/apache/qpid/basic/SessionStartTest.java
@@ -22,26 +22,30 @@ import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.client.testutil.VmOrRemoteTestCase;
+import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.vmbroker.AMQVMBrokerCreationException;
+
import org.junit.Before;
import org.junit.Test;
+import org.junit.Assert;
+import org.junit.After;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
-public class SessionStartTest extends VmOrRemoteTestCase implements MessageListener
+public class SessionStartTest implements MessageListener
{
private AMQConnection _connection;
private AMQDestination _destination;
private AMQSession _session;
private int count;
+ public String _connectionString = "vm://:1";
@Before
public void init() throws Exception
{
- String broker = getConnectionString();
- init(new AMQConnection(broker, "guest", "guest", randomize("Client"), "/test_path"));
+ init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "/test_path"));
}
private void init(AMQConnection connection) throws Exception
@@ -97,7 +101,7 @@ public class SessionStartTest extends VmOrRemoteTestCase implements MessageListe
public static void main(String[] argv) throws Exception
{
SessionStartTest test = new SessionStartTest();
- test.setConnectionString(argv.length == 0 ? "localhost:5672" : argv[0]);
+ test._connectionString = argv.length == 0 ? "localhost:5672" : argv[0];
test.init();
test.test();
}
diff --git a/java/client/test/src/org/apache/qpid/basic/TextMessageTest.java b/java/client/test/src/org/apache/qpid/basic/TextMessageTest.java
index a4e2cfd2a1..4f4bbb3028 100644
--- a/java/client/test/src/org/apache/qpid/basic/TextMessageTest.java
+++ b/java/client/test/src/org/apache/qpid/basic/TextMessageTest.java
@@ -23,7 +23,6 @@ import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.message.JMSTextMessage;
-import org.apache.qpid.client.testutil.VmOrRemoteTestCase;
import org.junit.Before;
import org.junit.Test;
@@ -35,7 +34,7 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
-public class TextMessageTest extends VmOrRemoteTestCase implements MessageListener
+public class TextMessageTest implements MessageListener
{
private AMQConnection _connection;
private AMQDestination _destination;
@@ -43,12 +42,12 @@ public class TextMessageTest extends VmOrRemoteTestCase implements MessageListen
private final List<JMSTextMessage> received = new ArrayList<JMSTextMessage>();
private final List<String> messages = new ArrayList<String>();
private int _count = 100;
+ public String _connectionString ="vm://:1";
@Before
public void init() throws Exception
{
- String broker = getConnectionString();
- init(new AMQConnection(broker, "guest", "guest", randomize("Client"), "/test_path"));
+ init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "/test_path"));
}
private void init(AMQConnection connection) throws Exception
@@ -164,7 +163,7 @@ public class TextMessageTest extends VmOrRemoteTestCase implements MessageListen
public static void main(String[] argv) throws Exception
{
TextMessageTest test = new TextMessageTest();
- test.setConnectionString(argv.length == 0 ? "localhost:5672" : argv[0]);
+ test._connectionString = argv.length == 0 ? "vm://:1" : argv[0];
test.init();
if (argv.length > 1) test._count = Integer.parseInt(argv[1]);
test.test();
diff --git a/java/client/test/src/org/apache/qpid/client/channelclose/ChannelCloseOkTest.java b/java/client/test/src/org/apache/qpid/client/channelclose/ChannelCloseOkTest.java
index 678261097f..be384f42d8 100644
--- a/java/client/test/src/org/apache/qpid/client/channelclose/ChannelCloseOkTest.java
+++ b/java/client/test/src/org/apache/qpid/client/channelclose/ChannelCloseOkTest.java
@@ -20,7 +20,6 @@ package org.apache.qpid.client.channelclose;
import junit.framework.JUnit4TestAdapter;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQQueue;
-import org.apache.qpid.client.testutil.VmOrRemoteTestCase;
import org.apache.log4j.Logger;
import org.junit.After;
import static org.junit.Assert.assertEquals;
@@ -45,7 +44,7 @@ import java.util.List;
* 3. Since client does not have an exception listener, currently all sessions are
* closed.
*/
-public class ChannelCloseOkTest extends VmOrRemoteTestCase
+public class ChannelCloseOkTest
{
private Connection _connection;
private Destination _destination1;
@@ -56,11 +55,12 @@ public class ChannelCloseOkTest extends VmOrRemoteTestCase
private final List<Message> _received2 = new ArrayList<Message>();
private final static Logger _log = Logger.getLogger(ChannelCloseOkTest.class);
+ public String _connectionString = "vm://:1";
@Before
public void init() throws Exception
{
- _connection = new AMQConnection(getConnectionString(), "guest", "guest", randomize("Client"), "/test_path");
+ _connection = new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "/test_path");
_destination1 = new AMQQueue("q1", true);
_destination2 = new AMQQueue("q2", true);
_session1 = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
diff --git a/java/client/test/src/org/apache/qpid/client/message/ObjectMessageTest.java b/java/client/test/src/org/apache/qpid/client/message/ObjectMessageTest.java
index 30eb115a7d..a5e1a0d558 100644
--- a/java/client/test/src/org/apache/qpid/client/message/ObjectMessageTest.java
+++ b/java/client/test/src/org/apache/qpid/client/message/ObjectMessageTest.java
@@ -21,6 +21,8 @@ import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQSession;
+import org.junit.Before;
+import org.junit.Test;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
@@ -31,6 +33,8 @@ import java.io.Serializable;
import java.util.HashMap;
import java.util.ArrayList;
+import junit.framework.JUnit4TestAdapter;
+
public class ObjectMessageTest implements MessageListener
{
private final AMQConnection connection;
@@ -181,7 +185,7 @@ public class ObjectMessageTest implements MessageListener
public static void main(String[] argv) throws Exception
{
- String broker = argv.length > 0 ? argv[0] : "localhost:5672";
+ String broker = argv.length > 0 ? argv[0] : "vm://:1";
if("-help".equals(broker))
{
System.out.println("Usage: <broker>");
@@ -244,4 +248,5 @@ public class ObjectMessageTest implements MessageListener
{
return in + System.currentTimeMillis();
}
+
}
diff --git a/java/client/test/src/org/apache/qpid/client/testutil/VmOrRemoteTestCase.java b/java/client/test/src/org/apache/qpid/client/testutil/VmOrRemoteTestCase.java
deleted file mode 100644
index 536f93a387..0000000000
--- a/java/client/test/src/org/apache/qpid/client/testutil/VmOrRemoteTestCase.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.qpid.client.testutil;
-
-import org.apache.qpid.vmbroker.VmPipeBroker;
-import org.junit.After;
-import org.junit.Before;
-
-public class VmOrRemoteTestCase
-{
- String _connectionString = "vm:1";
-
- VmPipeBroker _vmBroker;
-
- public boolean isVm() {
- return "vm:1".equals(_connectionString);
- }
-
- public void setConnectionString(final String connectionString) {
- this._connectionString = connectionString;
- }
-
- public String getConnectionString()
- {
- return _connectionString;
- }
-
- @Before
- public void startVmBroker() throws Exception {
- if (isVm()) {
- _vmBroker = new VmPipeBroker();
- _vmBroker.initialiseBroker();
- }
- }
-
- @After
- public void stopVmBroker() {
- _vmBroker.killBroker();
- }
-
-}
diff --git a/java/client/test/src/org/apache/qpid/cluster/Client.java b/java/client/test/src/org/apache/qpid/cluster/Client.java
index 9156d08060..1d12f157b9 100644
--- a/java/client/test/src/org/apache/qpid/cluster/Client.java
+++ b/java/client/test/src/org/apache/qpid/cluster/Client.java
@@ -117,6 +117,8 @@ public class Client
public static void main(String[] argv) throws AMQException, JMSException, InterruptedException, URLSyntaxException
{
//assume args describe the set of brokers to try
- new Client(new AMQConnection(argv[0], "guest", "guest", argv[1], "/test"), argv[1]);
+
+ String clientName = argv.length > 1 ? argv[1] : "testClient";
+ new Client(new AMQConnection(argv.length > 0 ? argv[0] : "vm://:1", "guest", "guest", clientName, "/test"), clientName);
}
}
diff --git a/java/client/test/src/org/apache/qpid/connection/TestManyConnections.java b/java/client/test/src/org/apache/qpid/connection/TestManyConnections.java
index 8552deb8fc..4456037c2e 100644
--- a/java/client/test/src/org/apache/qpid/connection/TestManyConnections.java
+++ b/java/client/test/src/org/apache/qpid/connection/TestManyConnections.java
@@ -21,11 +21,10 @@ import junit.framework.JUnit4TestAdapter;
import org.apache.qpid.AMQException;
import org.apache.qpid.url.URLSyntaxException;
import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.testutil.VmOrRemoteTestCase;
import org.apache.log4j.Logger;
import org.junit.Test;
-public class TestManyConnections extends VmOrRemoteTestCase
+public class TestManyConnections
{
private static final Logger _log = Logger.getLogger(TestManyConnections.class);
@@ -44,7 +43,7 @@ public class TestManyConnections extends VmOrRemoteTestCase
long startTime = System.currentTimeMillis();
for (int i = 0; i < count; i++)
{
- createConnection(i, "tcp://foo", "myClient" + i, "guest", "guest", "/test");
+ createConnection(i, "vm://:1", "myClient" + i, "guest", "guest", "/test");
}
long endTime = System.currentTimeMillis();
_log.info("Time to create " + count + " connections: " + (endTime - startTime) +
diff --git a/java/client/test/src/org/apache/qpid/failover/FailoverBrokerTester.java b/java/client/test/src/org/apache/qpid/failover/FailoverBrokerTester.java
new file mode 100644
index 0000000000..6f5c5021fa
--- /dev/null
+++ b/java/client/test/src/org/apache/qpid/failover/FailoverBrokerTester.java
@@ -0,0 +1,270 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.failover;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.vmbroker.AMQVMBrokerCreationException;
+
+public class FailoverBrokerTester implements Runnable
+{
+ private static final Logger _logger = Logger.getLogger(FailoverBrokerTester.class);
+
+ private int[] _brokers;
+ private int[] _brokersKilling;
+ private long _delayBeforeKillingStart;
+ private long _delayBetweenCullings;
+ private long _delayBetweenRecreates;
+ private boolean _recreateBrokers;
+ private long _delayBeforeReCreationStart;
+
+ private volatile boolean RUNNING;
+
+
+ /**
+ * An InVM Broker Tester. Creates then kills VM brokers to allow failover testing.
+ *
+ * @param brokerCount The number of brokers to create
+ * @param delay The delay before and between broker killings
+ */
+ public FailoverBrokerTester(int brokerCount, long delay)
+ {
+ this(brokerCount, delay, delay, false, 0, 0);
+ }
+
+ /**
+ * An InVM Broker Tester. Creates then kills VM brokers to allow failover testing.
+ *
+ * @param brokerCount The number of brokers
+ * @param delayBeforeKillingStart
+ * @param delayBetweenCullings
+ * @param recreateBrokers
+ * @param delayBeforeReCreationStart
+ * @param delayBetweenRecreates
+ */
+ public FailoverBrokerTester(int brokerCount, long delayBeforeKillingStart,
+ long delayBetweenCullings, boolean recreateBrokers,
+ long delayBeforeReCreationStart, long delayBetweenRecreates)
+ {
+ int[] brokers = new int[brokerCount];
+
+ for (int n = 0; n < brokerCount; n++)
+ {
+ brokers[n] = n + 1;
+ }
+ _brokersKilling = _brokers = brokers;
+ _recreateBrokers = recreateBrokers;
+ _delayBeforeKillingStart = delayBeforeKillingStart;
+ _delayBetweenCullings = delayBetweenCullings;
+ _delayBetweenRecreates = delayBetweenRecreates;
+ _delayBeforeReCreationStart = delayBeforeReCreationStart;
+
+ createWorld();
+ }
+
+ /**
+ * An InVM Broker Tester. Creates then kills VM brokers to allow failover testing.
+ *
+ * @param brokerArray Array for broker creation and killing order
+ * @param delayBeforeKillingStart
+ * @param delayBetweenCullings
+ * @param recreateBrokers
+ * @param delayBeforeReCreationStart
+ * @param delayBetweenRecreates
+ */
+ public FailoverBrokerTester(int[] brokerArray, long delayBeforeKillingStart,
+ long delayBetweenCullings, boolean recreateBrokers,
+ long delayBeforeReCreationStart, long delayBetweenRecreates)
+ {
+ _brokersKilling = _brokers = brokerArray;
+ _recreateBrokers = recreateBrokers;
+ _delayBeforeKillingStart = delayBeforeKillingStart;
+ _delayBetweenCullings = delayBetweenCullings;
+ _delayBetweenRecreates = delayBetweenRecreates;
+ _delayBeforeReCreationStart = delayBeforeReCreationStart;
+
+ createWorld();
+ }
+
+ /**
+ * An InVM Broker Tester. Creates then kills VM brokers to allow failover testing.
+ *
+ * @param brokerCreateOrder Array for broker creation order
+ * @param brokerKillOrder Array for broker killing order
+ * @param delayBeforeKillingStart
+ * @param delayBetweenCullings
+ * @param recreateBrokers
+ * @param delayBeforeReCreationStart
+ * @param delayBetweenRecreates
+ */
+ public FailoverBrokerTester(int[] brokerCreateOrder, int[] brokerKillOrder, long delayBeforeKillingStart,
+ long delayBetweenCullings, boolean recreateBrokers,
+ long delayBeforeReCreationStart, long delayBetweenRecreates)
+ {
+ _brokers = brokerCreateOrder;
+ _brokersKilling = brokerKillOrder;
+ _recreateBrokers = recreateBrokers;
+ _delayBeforeKillingStart = delayBeforeKillingStart;
+ _delayBetweenCullings = delayBetweenCullings;
+ _delayBetweenRecreates = delayBetweenRecreates;
+ _delayBeforeReCreationStart = delayBeforeReCreationStart;
+
+ createWorld();
+ }
+
+ private void createWorld()
+ {
+ System.setProperty("amqj.NoAutoCreateVMBroker", "true");
+
+ genesis();
+
+ Thread brokerGod = new Thread(this);
+ brokerGod.setName("Broker God");
+ brokerGod.start();
+ }
+
+
+ private void genesis()
+ {
+ _logger.info("Creating " + _brokers.length + " VM Brokers.");
+ for (int count = 0; count < _brokers.length; count++)
+ {
+ try
+ {
+ TransportConnection.createVMBroker(_brokers[count]);
+ }
+ catch (AMQVMBrokerCreationException e)
+ {
+ ;
+ }
+ }
+ }
+
+ public void run()
+ {
+
+ RUNNING = true;
+ try
+ {
+ _logger.info("Sleeping before culling starts.");
+ Thread.sleep(_delayBeforeKillingStart);
+ }
+ catch (InterruptedException e)
+ {
+ _logger.info("Interupted sleeping before killing starts.");
+ }
+
+ Thread brokerGod = new Thread(new BrokerDestroyer());
+ brokerGod.setName("Broker Destroyer");
+ brokerGod.start();
+
+ if (_recreateBrokers)
+ {
+ try
+ {
+ _logger.info("Sleeping before recreation starts.");
+ Thread.sleep(_delayBeforeReCreationStart - _delayBeforeKillingStart);
+ }
+ catch (InterruptedException e)
+ {
+ _logger.info("Interupted sleeping before recreation starts.");
+ }
+
+ brokerGod = new Thread(new BrokerCreator());
+ brokerGod.setName("Broker Creator");
+ brokerGod.start();
+ }
+ }
+
+
+ public void stopTesting()
+ {
+ _logger.info("Stopping Broker Tester.");
+ RUNNING = false;
+ }
+
+ class BrokerCreator implements Runnable
+ {
+ public void run()
+ {
+ _logger.info("Created Broker Creator.");
+ while (RUNNING)
+ {
+ for (int count = 0; count < _brokers.length; count++)
+ {
+ try
+ {
+ _logger.info("Creating Broker:" + _brokers[count]);
+ TransportConnection.createVMBroker(_brokers[count]);
+ }
+ catch (AMQVMBrokerCreationException e)
+ {
+ _logger.info("Unable to recreate broker:" + count + ", Port:" + _brokers[count]);
+ }
+ try
+ {
+ Thread.sleep(_delayBetweenRecreates);
+ }
+ catch (InterruptedException e)
+ {
+ _logger.info("Interupted between broker recreates.");
+ }
+ }
+ }
+ _logger.info("Ending Broker Creator.");
+ }
+ }
+
+ class BrokerDestroyer implements Runnable
+ {
+ public void run()
+ {
+ _logger.info("Created Broker Destroyer.");
+ while (RUNNING)
+ {
+ for (int count = 0; count < _brokersKilling.length; count++)
+ {
+ _logger.info("Destroying Broker:" + _brokersKilling[count]);
+ killNextBroker(_brokersKilling[count], _delayBetweenCullings);
+ }
+ }
+ _logger.info("Ending Broker Destroyer.");
+ }
+
+ private void killNextBroker(int broker, long delay)
+ {
+
+ //Kill the broker
+ TransportConnection.killVMBroker(broker);
+
+ //Give the client time to get up and going
+ try
+ {
+ Thread.sleep(delay);
+ }
+ catch (InterruptedException e)
+ {
+ _logger.info("Sleeping before broker killing was interrupted,");
+ }
+
+
+ }
+ }
+
+
+}
diff --git a/java/client/test/src/org/apache/qpid/failover/FailoverTxTest.java b/java/client/test/src/org/apache/qpid/failover/FailoverTxTest.java
index 573bd2af0a..10ec682bf5 100644
--- a/java/client/test/src/org/apache/qpid/failover/FailoverTxTest.java
+++ b/java/client/test/src/org/apache/qpid/failover/FailoverTxTest.java
@@ -21,8 +21,10 @@ import org.apache.log4j.Logger;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQConnectionURL;
import org.apache.qpid.jms.ConnectionListener;
+import org.junit.Assert;
import javax.jms.*;
+import javax.jms.IllegalStateException;
public class FailoverTxTest implements ConnectionListener
{
@@ -38,7 +40,8 @@ public class FailoverTxTest implements ConnectionListener
Session session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination queue = session.createTemporaryQueue();
- session.createConsumer(queue).setMessageListener(new MessageListener() {
+ session.createConsumer(queue).setMessageListener(new MessageListener()
+ {
public void onMessage(Message message)
{
try
@@ -47,7 +50,7 @@ public class FailoverTxTest implements ConnectionListener
}
catch (JMSException e)
{
- error(e);
+ error(e);
}
}
});
@@ -71,12 +74,15 @@ public class FailoverTxTest implements ConnectionListener
TextMessage msg = session.createTextMessage("Tx=" + i + " msg=" + j);
_log.info("sending message = " + msg.getText());
producer.send(msg);
- try {
+ try
+ {
Thread.sleep(1000);
- } catch (InterruptedException e) {
+ }
+ catch (InterruptedException e)
+ {
throw new RuntimeException("Someone interrupted me!", e);
}
- }
+ }
session.commit();
}
}
@@ -91,7 +97,7 @@ public class FailoverTxTest implements ConnectionListener
{
System.out.println("Stopping...");
try
- {
+ {
_connection.close();
}
catch (JMSException e)
@@ -128,19 +134,46 @@ public class FailoverTxTest implements ConnectionListener
public static void main(final String[] argv) throws Exception
{
- try {
+ int[] creationOrder = {1, 2, 3};
+ int[] killingOrder = {1, 2, 3};
+ long delayBeforeKillingStart = 2000;
+ long delayBetweenCullings = 2000;
+ boolean recreateBrokers = true;
+ long delayBeforeReCreationStart = 4000;
+ long delayBetweenRecreates = 3000;
+
+ FailoverBrokerTester tester = new FailoverBrokerTester(creationOrder, killingOrder, delayBeforeKillingStart, delayBetweenCullings,
+ recreateBrokers, delayBeforeReCreationStart, delayBetweenRecreates);
+
+ try
+ {
final String clientId = "failover" + System.currentTimeMillis();
final String defaultUrl = "amqp://guest:guest@" + clientId + "/test" +
- "?brokerlist='tcp://localhost:5672;tcp://localhost:5673'&failover='roundrobin'";
+ "?brokerlist='vm://:1;vm://:2;vm://:3'&failover='roundrobin?cyclecount='2''";
System.out.println("url = [" + defaultUrl + "]");
System.out.println("connection url = [" + new AMQConnectionURL(defaultUrl) + "]");
- final String url = argv.length == 0? defaultUrl : argv[0];
+ final String url = argv.length == 0 ? defaultUrl : argv[0];
new FailoverTxTest(url);
- } catch (Throwable t) {
- _log.error("test failed", t);
+
+ }
+ catch (Throwable t)
+ {
+
+ if (t instanceof IllegalStateException)
+ {
+ t.getMessage().endsWith("has been closed");
+ }
+ else
+ {
+ Assert.fail("Unexpected Exception occured:" + t.getMessage());
+ }
+ }
+ finally
+ {
+ tester.stopTesting();
}
}
}
diff --git a/java/client/test/src/org/apache/qpid/forwardall/Combined.java b/java/client/test/src/org/apache/qpid/forwardall/Combined.java
index 7f32a4a837..2e32798171 100644
--- a/java/client/test/src/org/apache/qpid/forwardall/Combined.java
+++ b/java/client/test/src/org/apache/qpid/forwardall/Combined.java
@@ -18,27 +18,32 @@
package org.apache.qpid.forwardall;
import junit.framework.JUnit4TestAdapter;
-import org.apache.qpid.client.testutil.VmOrRemoteTestCase;
import org.junit.Test;
+import org.junit.Before;
+import org.junit.Assert;
+import org.junit.After;
+import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.vmbroker.AMQVMBrokerCreationException;
/**
* Runs the Service's and Client parts of the test in the same process
* as the broker
*/
-public class Combined extends VmOrRemoteTestCase
+public class Combined
{
@Test
public void forwardAll() throws Exception
{
int services = 2;
- ServiceCreator.start("tcp://ignore:1234", services);
+ ServiceCreator.start("vm://:1", services);
//give them time to get registered etc.
System.out.println("Services started, waiting for them to initialise...");
Thread.sleep(5 * 1000);
System.out.println("Starting client...");
- new Client("tcp://ignore:1234", services).waitUntilComplete();
+ new Client("vm://:1", services).waitUntilComplete();
System.out.println("Completed successfully!");
}
diff --git a/java/client/test/src/org/apache/qpid/requestreply1/VmRequestReply.java b/java/client/test/src/org/apache/qpid/requestreply1/VmRequestReply.java
index 34e5aa1727..16ec4e2d73 100644
--- a/java/client/test/src/org/apache/qpid/requestreply1/VmRequestReply.java
+++ b/java/client/test/src/org/apache/qpid/requestreply1/VmRequestReply.java
@@ -17,23 +17,46 @@
*/
package org.apache.qpid.requestreply1;
-import org.apache.qpid.vmbroker.VmPipeBroker;
+import org.apache.qpid.vmbroker.AMQVMBrokerCreationException;
+import org.apache.qpid.client.transport.TransportConnection;
import org.junit.Test;
+import org.junit.Before;
+import org.junit.Assert;
+import org.junit.After;
import org.apache.log4j.Logger;
import junit.framework.JUnit4TestAdapter;
-public class VmRequestReply extends VmPipeBroker
+public class VmRequestReply
{
private static final Logger _logger = Logger.getLogger(VmRequestReply.class);
+ @Before
+ public void startVmBrokers()
+ {
+ try
+ {
+ TransportConnection.createVMBroker(1);
+ }
+ catch (AMQVMBrokerCreationException e)
+ {
+ Assert.fail("Unable to create VM Broker");
+ }
+ }
+
+ @After
+ public void stopVmBrokers()
+ {
+ TransportConnection.killVMBroker(1);
+ }
+
@Test
public void simpleClient() throws Exception
{
- ServiceProvidingClient serviceProvider = new ServiceProvidingClient("tcp://foo:123", "guest", "guest",
+ ServiceProvidingClient serviceProvider = new ServiceProvidingClient("vm://:1", "guest", "guest",
"serviceProvidingClient", "/test",
"serviceQ");
- ServiceRequestingClient serviceRequester = new ServiceRequestingClient("tcp://foo:123", "myClient", "guest", "guest",
+ ServiceRequestingClient serviceRequester = new ServiceRequestingClient("vm://:1", "myClient", "guest", "guest",
"/test", "serviceQ", 5000, 512);
serviceProvider.run();
@@ -58,7 +81,6 @@ public class VmRequestReply extends VmPipeBroker
VmRequestReply rr = new VmRequestReply();
try
{
- rr.initialiseBroker();
rr.simpleClient();
}
catch (Exception e)
diff --git a/java/client/test/src/org/apache/qpid/topic/DurableSubscriptionTest.java b/java/client/test/src/org/apache/qpid/topic/DurableSubscriptionTest.java
index cec6a72fea..85ecd3398c 100644
--- a/java/client/test/src/org/apache/qpid/topic/DurableSubscriptionTest.java
+++ b/java/client/test/src/org/apache/qpid/topic/DurableSubscriptionTest.java
@@ -20,13 +20,15 @@ package org.apache.qpid.topic;
import junit.framework.JUnit4TestAdapter;
import org.junit.Assert;
import org.junit.Test;
+import org.junit.Before;
+import org.junit.After;
import org.apache.qpid.AMQException;
+import org.apache.qpid.vmbroker.AMQVMBrokerCreationException;
import org.apache.qpid.url.URLSyntaxException;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.AMQTopic;
-import org.apache.qpid.client.testutil.VmOrRemoteTestCase;
-import org.apache.qpid.vmbroker.VmPipeBroker;
+import org.apache.qpid.client.transport.TransportConnection;
import javax.jms.JMSException;
import javax.jms.Message;
@@ -36,13 +38,33 @@ import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.TopicSubscriber;
-public class DurableSubscriptionTest extends VmOrRemoteTestCase
+public class DurableSubscriptionTest
{
+
+ @Before
+ public void startVmBrokers()
+ {
+ try
+ {
+ TransportConnection.createVMBroker(1);
+ }
+ catch (AMQVMBrokerCreationException e)
+ {
+ Assert.fail("Unable to create VM Broker");
+ }
+ }
+
+ @After
+ public void stopVmBrokers()
+ {
+ TransportConnection.killVMBroker(1);
+ }
+
@Test
public void unsubscribe() throws AMQException, JMSException, URLSyntaxException
{
AMQTopic topic = new AMQTopic("MyTopic");
- AMQConnection con = new AMQConnection("vm:1", "guest", "guest", "test", "/test");
+ AMQConnection con = new AMQConnection("vm://:1", "guest", "guest", "test", "/test");
Session session1 = con.createSession(false, AMQSession.NO_ACKNOWLEDGE);
MessageConsumer consumer1 = session1.createConsumer(topic);
MessageProducer producer = session1.createProducer(topic);
@@ -84,7 +106,7 @@ public class DurableSubscriptionTest extends VmOrRemoteTestCase
public void durability() throws AMQException, JMSException, URLSyntaxException
{
AMQTopic topic = new AMQTopic("MyTopic");
- AMQConnection con = new AMQConnection("vm:1", "guest", "guest", "test", "/test");
+ AMQConnection con = new AMQConnection("vm://:1", "guest", "guest", "test", "/test");
Session session1 = con.createSession(false, AMQSession.NO_ACKNOWLEDGE);
MessageConsumer consumer1 = session1.createConsumer(topic);
MessageProducer producer = session1.createProducer(topic);
diff --git a/java/client/test/src/org/apache/qpid/transacted/TransactedTest.java b/java/client/test/src/org/apache/qpid/transacted/TransactedTest.java
index cb5ea4335d..9cf767436a 100644
--- a/java/client/test/src/org/apache/qpid/transacted/TransactedTest.java
+++ b/java/client/test/src/org/apache/qpid/transacted/TransactedTest.java
@@ -21,16 +21,18 @@ import junit.framework.JUnit4TestAdapter;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.client.testutil.VmOrRemoteTestCase;
+import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.vmbroker.AMQVMBrokerCreationException;
import org.junit.After;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import org.junit.Before;
import org.junit.Test;
+import org.junit.Assert;
import javax.jms.*;
-public class TransactedTest extends VmOrRemoteTestCase
+public class TransactedTest
{
private AMQQueue queue1;
private AMQQueue queue2;
@@ -52,28 +54,36 @@ public class TransactedTest extends VmOrRemoteTestCase
@Before
public void setup() throws Exception
{
+ try
+ {
+ TransportConnection.createVMBroker(1);
+ }
+ catch (AMQVMBrokerCreationException e)
+ {
+ Assert.fail("Unable to create VM Broker");
+ }
+
queue1 = new AMQQueue("Q1", false);
queue2 = new AMQQueue("Q2", false);
- con = new AMQConnection("localhost:5672", "guest", "guest", "TransactedTest", "/test");
+ con = new AMQConnection("vm://:1", "guest", "guest", "TransactedTest", "/test");
session = con.createSession(true, 0);
consumer = session.createConsumer(queue1);
producer = session.createProducer(queue2);
con.start();
- prepCon = new AMQConnection("localhost:5672", "guest", "guest", "PrepConnection", "/test");
+ prepCon = new AMQConnection("vm://:1", "guest", "guest", "PrepConnection", "/test");
prepSession = prepCon.createSession(false, AMQSession.NO_ACKNOWLEDGE);
prepProducer = prepSession.createProducer(queue1);
prepCon.start();
-
//add some messages
prepProducer.send(prepSession.createTextMessage("A"));
prepProducer.send(prepSession.createTextMessage("B"));
prepProducer.send(prepSession.createTextMessage("C"));
- testCon = new AMQConnection("localhost:5672", "guest", "guest", "TestConnection", "/test");
+ testCon = new AMQConnection("vm://:1", "guest", "guest", "TestConnection", "/test");
testSession = testCon.createSession(false, AMQSession.NO_ACKNOWLEDGE);
testConsumer1 = testSession.createConsumer(queue1);
testConsumer2 = testSession.createConsumer(queue2);
@@ -86,6 +96,8 @@ public class TransactedTest extends VmOrRemoteTestCase
con.close();
testCon.close();
prepCon.close();
+
+ TransportConnection.killVMBroker(1);
}
@Test
diff --git a/java/client/test/src/org/apache/qpid/transport/VmPipeTransportConnection.java b/java/client/test/src/org/apache/qpid/transport/VmPipeTransportConnection.java
deleted file mode 100644
index 1fa09d0524..0000000000
--- a/java/client/test/src/org/apache/qpid/transport/VmPipeTransportConnection.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.qpid.transport;
-
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.protocol.AMQProtocolHandler;
-import org.apache.qpid.client.transport.ITransportConnection;
-import org.apache.qpid.pool.PoolingFilter;
-import org.apache.qpid.pool.ReferenceCountingExecutorService;
-import org.apache.qpid.jms.BrokerDetails;
-import org.apache.log4j.Logger;
-import org.apache.mina.common.ConnectFuture;
-import org.apache.mina.common.IoServiceConfig;
-import org.apache.mina.transport.vmpipe.VmPipeAddress;
-import org.apache.mina.transport.vmpipe.VmPipeConnector;
-
-import java.io.IOException;
-
-public class VmPipeTransportConnection implements ITransportConnection
-{
- private static final Logger _logger = Logger.getLogger(VmPipeTransportConnection.class);
-
- public void connect(AMQProtocolHandler protocolHandler, BrokerDetails brokerDetail) throws IOException
- {
- final VmPipeConnector ioConnector = new VmPipeConnector();
- final IoServiceConfig cfg = ioConnector.getDefaultConfig();
- ReferenceCountingExecutorService executorService = ReferenceCountingExecutorService.getInstance();
- PoolingFilter asyncRead = new PoolingFilter(executorService, PoolingFilter.READ_EVENTS,
- "AsynchronousReadFilter");
- cfg.getFilterChain().addFirst("AsynchronousReadFilter", asyncRead);
- PoolingFilter asyncWrite = new PoolingFilter(executorService, PoolingFilter.WRITE_EVENTS,
- "AsynchronousWriteFilter");
- cfg.getFilterChain().addLast("AsynchronousWriteFilter", asyncWrite);
-
- _logger.info("Ignoring broker connection details: " + brokerDetail);
- final VmPipeAddress address = new VmPipeAddress(1);
- _logger.info("Attempting connection to " + address);
- ConnectFuture future = ioConnector.connect(address, protocolHandler);
- // wait for connection to complete
- future.join();
- // we call getSession which throws an IOException if there has been an error connecting
- future.getSession();
- }
-}
diff --git a/java/client/test/src/org/apache/qpid/vmbroker/VmPipeBroker.java b/java/client/test/src/org/apache/qpid/vmbroker/VmPipeBroker.java
deleted file mode 100644
index d86502b609..0000000000
--- a/java/client/test/src/org/apache/qpid/vmbroker/VmPipeBroker.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.qpid.vmbroker;
-
-import org.apache.qpid.client.transport.TransportConnection;
-import org.apache.qpid.pool.ReadWriteThreadModel;
-import org.apache.qpid.server.protocol.AMQPProtocolProvider;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.util.NullApplicationRegistry;
-import org.apache.qpid.transport.VmPipeTransportConnection;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.log4j.Logger;
-import org.apache.mina.common.IoAcceptor;
-import org.apache.mina.common.IoServiceConfig;
-import org.apache.mina.transport.vmpipe.VmPipeAcceptor;
-import org.apache.mina.transport.vmpipe.VmPipeAddress;
-
-/**
- * This class is a useful base class when you want to run a test where the
- * broker and the client(s) run in the same VM.
- *
- * Once the VmPipeBroker is initialise, and the client transport
- * is overridden (ignores any connection strings) and connects to this
- * in-VM broker instead (rather than using TCP/IP sockets). This means
- * that clients will run unmodified.
- */
-public class VmPipeBroker
-{
- private static final Logger _logger = Logger.getLogger(VmPipeBroker.class);
-
- private IoAcceptor _acceptor;
-
- public void initialiseBroker() throws Exception
- {
- try
- {
- ApplicationRegistry.initialise(new NullApplicationRegistry());
- }
- catch (ConfigurationException e)
- {
- _logger.error("Error configuring application: " + e, e);
- throw e;
- }
-
- _acceptor = new VmPipeAcceptor();
- IoServiceConfig config = _acceptor.getDefaultConfig();
-
- config.setThreadModel(new ReadWriteThreadModel());
- _acceptor.bind(new VmPipeAddress(1),
- new AMQPProtocolProvider().getHandler());
-
- _logger.info("InVM Qpid.AMQP listening on port " + 1);
- TransportConnection.setInstance(new VmPipeTransportConnection());
- }
-
- public void killBroker()
- {
- _acceptor.unbindAll();
- _acceptor = null;
- }
-}