diff options
22 files changed, 509 insertions, 283 deletions
diff --git a/java/client/test/src/org/apache/qpid/ack/DisconnectAndRedeliverTest.java b/java/client/test/src/org/apache/qpid/ack/DisconnectAndRedeliverTest.java index 254b3d43ae..1dccf2f336 100644 --- a/java/client/test/src/org/apache/qpid/ack/DisconnectAndRedeliverTest.java +++ b/java/client/test/src/org/apache/qpid/ack/DisconnectAndRedeliverTest.java @@ -20,17 +20,20 @@ package org.apache.qpid.ack; import junit.framework.JUnit4TestAdapter; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQQueue; -import org.apache.qpid.client.testutil.VmOrRemoteTestCase; +import org.apache.qpid.client.transport.TransportConnection; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.store.TestableMemoryMessageStore; +import org.apache.qpid.vmbroker.AMQVMBrokerCreationException; import org.apache.log4j.Logger; import org.apache.log4j.xml.DOMConfigurator; import org.junit.Assert; import org.junit.Test; +import org.junit.After; +import org.junit.Before; import javax.jms.*; -public class DisconnectAndRedeliverTest extends VmOrRemoteTestCase +public class DisconnectAndRedeliverTest { private static final Logger _logger = Logger.getLogger(DisconnectAndRedeliverTest.class); @@ -40,21 +43,30 @@ public class DisconnectAndRedeliverTest extends VmOrRemoteTestCase DOMConfigurator.configure("broker/etc/log4j.xml"); } + @After + public void stopVmBroker() + { + TransportConnection.killVMBroker(1); + } + /** * This tests that when there are unacknowledged messages on a channel they are requeued for delivery when * the channel is closed. + * * @throws Exception */ @Test public void disconnectRedeliversMessages() throws Exception { - Connection con = new AMQConnection("foo", 1, "guest", "guest", "consumer1", "/test"); + Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test"); + + TestableMemoryMessageStore store = (TestableMemoryMessageStore) ApplicationRegistry.getInstance().getMessageStore(); Session consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE); AMQQueue queue = new AMQQueue("someQ", "someQ", false, false); MessageConsumer consumer = consumerSession.createConsumer(queue); - Connection con2 = new AMQConnection("bar", 2, "guest", "guest", "producer1", "/test"); + Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test"); Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE); MessageProducer producer = producerSession.createProducer(queue); @@ -64,6 +76,8 @@ public class DisconnectAndRedeliverTest extends VmOrRemoteTestCase producer.send(producerSession.createTextMessage("msg3")); producer.send(producerSession.createTextMessage("msg4")); + con2.close(); + _logger.info("Starting connection"); con.start(); TextMessage tm = (TextMessage) consumer.receive(); @@ -75,7 +89,7 @@ public class DisconnectAndRedeliverTest extends VmOrRemoteTestCase _logger.info("Received all four messages. About to disconnect and reconnect"); con.close(); - con = new AMQConnection("foo", 1, "guest", "guest", "consumer1", "/test"); + con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test"); consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE); consumer = consumerSession.createConsumer(queue); @@ -85,9 +99,11 @@ public class DisconnectAndRedeliverTest extends VmOrRemoteTestCase tm = (TextMessage) consumer.receive(3000); Assert.assertEquals(tm.getText(), "msg2"); + tm = (TextMessage) consumer.receive(3000); Assert.assertEquals(tm.getText(), "msg3"); + tm = (TextMessage) consumer.receive(3000); Assert.assertEquals(tm.getText(), "msg4"); @@ -96,7 +112,7 @@ public class DisconnectAndRedeliverTest extends VmOrRemoteTestCase con.close(); - con = new AMQConnection("foo", 1, "guest", "guest", "consumer1", "/test"); + con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test"); consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE); consumer = consumerSession.createConsumer(queue); _logger.info("Starting third consumer connection"); @@ -105,23 +121,38 @@ public class DisconnectAndRedeliverTest extends VmOrRemoteTestCase Assert.assertNull(tm); _logger.info("No messages redelivered as is expected"); con.close(); + + con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test"); + consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE); + consumer = consumerSession.createConsumer(queue); + _logger.info("Starting fourth consumer connection"); + con.start(); + tm = (TextMessage) consumer.receive(3000); + Assert.assertNull(tm); + _logger.info("No messages redelivered as is expected"); + con.close(); + + _logger.info("Actually:" + store.getMessageMap().size()); + // Assert.assertTrue(store.getMessageMap().size() == 0); } /** * Tests that unacknowledged messages are thrown away when the channel is closed and they cannot be * requeued (due perhaps to the queue being deleted). + * * @throws Exception */ @Test public void disconnectWithTransientQueueThrowsAwayMessages() throws Exception { - Connection con = new AMQConnection("foo", 1, "guest", "guest", "consumer1", "/test"); + Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test"); + TestableMemoryMessageStore store = (TestableMemoryMessageStore) ApplicationRegistry.getInstance().getMessageStore(); Session consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE); Queue queue = new AMQQueue("someQ", "someQ", false, true); MessageConsumer consumer = consumerSession.createConsumer(queue); - Connection con2 = new AMQConnection("bar", 2, "guest", "guest", "producer1", "/test"); + Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test"); Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE); MessageProducer producer = producerSession.createProducer(queue); @@ -131,6 +162,8 @@ public class DisconnectAndRedeliverTest extends VmOrRemoteTestCase producer.send(producerSession.createTextMessage("msg3")); producer.send(producerSession.createTextMessage("msg4")); + con2.close(); + _logger.info("Starting connection"); con.start(); TextMessage tm = (TextMessage) consumer.receive(); @@ -142,7 +175,7 @@ public class DisconnectAndRedeliverTest extends VmOrRemoteTestCase _logger.info("Received all four messages. About to disconnect and reconnect"); con.close(); - con = new AMQConnection("foo", 1, "guest", "guest", "consumer1", "/test"); + con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test"); consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE); consumer = consumerSession.createConsumer(queue); @@ -152,7 +185,8 @@ public class DisconnectAndRedeliverTest extends VmOrRemoteTestCase tm = (TextMessage) consumer.receiveNoWait(); Assert.assertNull(tm); _logger.info("No messages redelivered as is expected"); - TestableMemoryMessageStore store = (TestableMemoryMessageStore) ApplicationRegistry.getInstance().getMessageStore(); + + _logger.info("Actually:" + store.getMessageMap().size()); Assert.assertTrue(store.getMessageMap().size() == 0); con.close(); } diff --git a/java/client/test/src/org/apache/qpid/ack/RecoverTest.java b/java/client/test/src/org/apache/qpid/ack/RecoverTest.java index 0ab40aa67c..cd7c43d4a1 100644 --- a/java/client/test/src/org/apache/qpid/ack/RecoverTest.java +++ b/java/client/test/src/org/apache/qpid/ack/RecoverTest.java @@ -20,15 +20,18 @@ package org.apache.qpid.ack; import junit.framework.JUnit4TestAdapter; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQQueue; -import org.apache.qpid.client.testutil.VmOrRemoteTestCase; +import org.apache.qpid.client.transport.TransportConnection; +import org.apache.qpid.vmbroker.AMQVMBrokerCreationException; import org.apache.log4j.Logger; import org.apache.log4j.xml.DOMConfigurator; import org.junit.Assert; import org.junit.Test; +import org.junit.Before; +import org.junit.After; import javax.jms.*; -public class RecoverTest extends VmOrRemoteTestCase +public class RecoverTest { private static final Logger _logger = Logger.getLogger(RecoverTest.class); @@ -38,16 +41,22 @@ public class RecoverTest extends VmOrRemoteTestCase DOMConfigurator.configure("broker/etc/log4j.xml"); } + @After + public void stopVmBroker() + { + TransportConnection.killVMBroker(1); + } + @Test public void recoverResendsMsgs() throws Exception { - Connection con = new AMQConnection("foo", 1, "guest", "guest", "consumer1", "/test"); + Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test"); Session consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE); Queue queue = new AMQQueue("someQ", "someQ", false, true); MessageConsumer consumer = consumerSession.createConsumer(queue); - Connection con2 = new AMQConnection("bar", 2, "guest", "guest", "producer1", "/test"); + Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test"); Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE); MessageProducer producer = producerSession.createProducer(queue); @@ -57,6 +66,8 @@ public class RecoverTest extends VmOrRemoteTestCase producer.send(producerSession.createTextMessage("msg3")); producer.send(producerSession.createTextMessage("msg4")); + con2.close(); + _logger.info("Starting connection"); con.start(); TextMessage tm = (TextMessage) consumer.receive(); @@ -87,6 +98,9 @@ public class RecoverTest extends VmOrRemoteTestCase tm = (TextMessage) consumer.receiveNoWait(); Assert.assertNull(tm); _logger.info("No messages redelivered as is expected"); + + con.close(); + } public static junit.framework.Test suite() diff --git a/java/client/test/src/org/apache/qpid/basic/BytesMessageTest.java b/java/client/test/src/org/apache/qpid/basic/BytesMessageTest.java index a5cda66982..e8a9debe1d 100644 --- a/java/client/test/src/org/apache/qpid/basic/BytesMessageTest.java +++ b/java/client/test/src/org/apache/qpid/basic/BytesMessageTest.java @@ -23,7 +23,6 @@ import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQSession; import org.apache.qpid.client.message.JMSBytesMessage; -import org.apache.qpid.client.testutil.VmOrRemoteTestCase; import org.apache.mina.common.ByteBuffer; import org.junit.Test; @@ -32,7 +31,7 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; -public class BytesMessageTest extends VmOrRemoteTestCase implements MessageListener +public class BytesMessageTest implements MessageListener { private Connection _connection; private Destination _destination; @@ -40,10 +39,11 @@ public class BytesMessageTest extends VmOrRemoteTestCase implements MessageListe private final List<JMSBytesMessage> received = new ArrayList<JMSBytesMessage>(); private final List<byte[]> messages = new ArrayList<byte[]>(); private int _count = 100; + public String _connectionString = "vm://:1"; void init() throws Exception { - init(new AMQConnection(getConnectionString(), "guest", "guest", randomize("Client"), "/test_path")); + init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "/test_path")); } void init(AMQConnection connection) throws Exception @@ -182,7 +182,7 @@ public class BytesMessageTest extends VmOrRemoteTestCase implements MessageListe final int count; if (argv.length == 0) { - connectionString = "localhost:5672"; + connectionString = "vm://:1"; count = 100; } else @@ -195,7 +195,7 @@ public class BytesMessageTest extends VmOrRemoteTestCase implements MessageListe System.out.println("count = " + count); BytesMessageTest test = new BytesMessageTest(); - test.setConnectionString(connectionString); + test._connectionString = connectionString; test._count = count; test.test(); } diff --git a/java/client/test/src/org/apache/qpid/basic/FieldTableMessageTest.java b/java/client/test/src/org/apache/qpid/basic/FieldTableMessageTest.java index 6417544d01..82e43773a4 100644 --- a/java/client/test/src/org/apache/qpid/basic/FieldTableMessageTest.java +++ b/java/client/test/src/org/apache/qpid/basic/FieldTableMessageTest.java @@ -23,7 +23,6 @@ import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQSession; import org.apache.qpid.client.message.JMSBytesMessage; -import org.apache.qpid.client.testutil.VmOrRemoteTestCase; import org.apache.qpid.framing.AMQFrameDecodingException; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.FieldTableTest; @@ -36,7 +35,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Enumeration; -public class FieldTableMessageTest extends VmOrRemoteTestCase implements MessageListener +public class FieldTableMessageTest implements MessageListener { private AMQConnection _connection; private AMQDestination _destination; @@ -44,11 +43,12 @@ public class FieldTableMessageTest extends VmOrRemoteTestCase implements Message private final ArrayList<JMSBytesMessage> received = new ArrayList<JMSBytesMessage>(); private FieldTable _expected; private int _count = 10; + public String _connectionString = "vm://:1"; @Before public void init() throws Exception { - init(new AMQConnection(getConnectionString(), "guest", "guest", randomize("Client"), "/test_path")); + init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "/test_path")); } private void init(AMQConnection connection) throws Exception @@ -143,7 +143,7 @@ public class FieldTableMessageTest extends VmOrRemoteTestCase implements Message public static void main(String[] argv) throws Exception { FieldTableMessageTest test = new FieldTableMessageTest(); - test.setConnectionString((argv.length == 0 ? "localhost:5672" : argv[0])); + test._connectionString = argv.length == 0 ? "vm://:1" : argv[0]; test.init(); test._count = argv.length > 1 ? Integer.parseInt(argv[1]) : 5; test.test(); diff --git a/java/client/test/src/org/apache/qpid/basic/MultipleConnectionTest.java b/java/client/test/src/org/apache/qpid/basic/MultipleConnectionTest.java index 28ec516243..8bf6fc1991 100644 --- a/java/client/test/src/org/apache/qpid/basic/MultipleConnectionTest.java +++ b/java/client/test/src/org/apache/qpid/basic/MultipleConnectionTest.java @@ -22,7 +22,6 @@ import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.AMQSession; import org.apache.qpid.client.AMQTopic; -import org.apache.qpid.client.testutil.VmOrRemoteTestCase; import org.junit.Test; import javax.jms.*; @@ -30,8 +29,10 @@ import javax.jms.*; /** * This is a slow test. */ -public class MultipleConnectionTest extends VmOrRemoteTestCase +public class MultipleConnectionTest { + public static String _connectionString="vm://:1"; + private static class Receiver { private AMQConnection _connection; @@ -158,20 +159,20 @@ public class MultipleConnectionTest extends VmOrRemoteTestCase public static void main(String[] argv) throws Exception { - String broker = argv.length > 0 ? argv[0] : "localhost:5672"; + String broker = argv.length > 0 ? argv[0] : "vm://:1"; int connections = 7; int sessions = 2; MultipleConnectionTest test = new MultipleConnectionTest(); - test.setConnectionString(broker); + test._connectionString = broker; test.test(); } @Test public void test() throws Exception { - String broker = getConnectionString(); + String broker = _connectionString; int messages = 10; AMQTopic topic = new AMQTopic("amq.topic"); diff --git a/java/client/test/src/org/apache/qpid/basic/ObjectMessageTest.java b/java/client/test/src/org/apache/qpid/basic/ObjectMessageTest.java index d4e1315ccb..a78cd6f72b 100644 --- a/java/client/test/src/org/apache/qpid/basic/ObjectMessageTest.java +++ b/java/client/test/src/org/apache/qpid/basic/ObjectMessageTest.java @@ -23,7 +23,6 @@ import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQSession; import org.apache.qpid.client.message.JMSObjectMessage; -import org.apache.qpid.client.testutil.VmOrRemoteTestCase; import org.junit.Before; import org.junit.Test; @@ -36,7 +35,7 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; -public class ObjectMessageTest extends VmOrRemoteTestCase implements MessageListener +public class ObjectMessageTest implements MessageListener { private AMQConnection _connection; private AMQDestination _destination; @@ -44,11 +43,12 @@ public class ObjectMessageTest extends VmOrRemoteTestCase implements MessageList private final List<JMSObjectMessage> received = new ArrayList<JMSObjectMessage>(); private final List<Payload> messages = new ArrayList<Payload>(); private int _count = 100; + public String _connectionString = "vm://:1"; @Before public void init() throws Exception { - String broker = getConnectionString(); + String broker = _connectionString; init(new AMQConnection(broker, "guest", "guest", randomize("Client"), "/test_path")); } @@ -190,7 +190,7 @@ public class ObjectMessageTest extends VmOrRemoteTestCase implements MessageList public static void main(String[] argv) throws Exception { ObjectMessageTest test = new ObjectMessageTest(); - test.setConnectionString(argv.length == 0 ? "localhost:5672" : argv[0]); + test._connectionString = argv.length == 0 ? "vm://:1" : argv[0]; test.init(); if (argv.length > 1) { diff --git a/java/client/test/src/org/apache/qpid/basic/ReceiveTest.java b/java/client/test/src/org/apache/qpid/basic/ReceiveTest.java index e1ef686d32..5b7db2b77c 100644 --- a/java/client/test/src/org/apache/qpid/basic/ReceiveTest.java +++ b/java/client/test/src/org/apache/qpid/basic/ReceiveTest.java @@ -22,30 +22,25 @@ import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQSession; -import org.apache.qpid.client.message.JMSTextMessage; -import org.apache.qpid.client.testutil.VmOrRemoteTestCase; + import org.junit.Before; import org.junit.Test; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageListener; import javax.jms.MessageConsumer; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -public class ReceiveTest extends VmOrRemoteTestCase +public class ReceiveTest { private AMQConnection _connection; private AMQDestination _destination; private AMQSession _session; private MessageConsumer _consumer; + public String _connectionString = "vm://:1"; + @Before public void init() throws Exception { - String broker = getConnectionString(); + String broker = _connectionString; init(new AMQConnection(broker, "guest", "guest", "ReceiveTestClient", "/test_path")); } @@ -73,7 +68,7 @@ public class ReceiveTest extends VmOrRemoteTestCase public static void main(String[] argv) throws Exception { ReceiveTest test = new ReceiveTest(); - test.setConnectionString(argv.length == 0 ? "localhost:5672" : argv[0]); + test._connectionString = argv.length == 0 ? "vm://:1" : argv[0]; test.init(); test.test(); } diff --git a/java/client/test/src/org/apache/qpid/basic/SessionStartTest.java b/java/client/test/src/org/apache/qpid/basic/SessionStartTest.java index e9ce8ab7c3..a5f7679ffd 100644 --- a/java/client/test/src/org/apache/qpid/basic/SessionStartTest.java +++ b/java/client/test/src/org/apache/qpid/basic/SessionStartTest.java @@ -22,26 +22,30 @@ import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQSession; -import org.apache.qpid.client.testutil.VmOrRemoteTestCase; +import org.apache.qpid.client.transport.TransportConnection; +import org.apache.qpid.vmbroker.AMQVMBrokerCreationException; + import org.junit.Before; import org.junit.Test; +import org.junit.Assert; +import org.junit.After; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; -public class SessionStartTest extends VmOrRemoteTestCase implements MessageListener +public class SessionStartTest implements MessageListener { private AMQConnection _connection; private AMQDestination _destination; private AMQSession _session; private int count; + public String _connectionString = "vm://:1"; @Before public void init() throws Exception { - String broker = getConnectionString(); - init(new AMQConnection(broker, "guest", "guest", randomize("Client"), "/test_path")); + init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "/test_path")); } private void init(AMQConnection connection) throws Exception @@ -97,7 +101,7 @@ public class SessionStartTest extends VmOrRemoteTestCase implements MessageListe public static void main(String[] argv) throws Exception { SessionStartTest test = new SessionStartTest(); - test.setConnectionString(argv.length == 0 ? "localhost:5672" : argv[0]); + test._connectionString = argv.length == 0 ? "localhost:5672" : argv[0]; test.init(); test.test(); } diff --git a/java/client/test/src/org/apache/qpid/basic/TextMessageTest.java b/java/client/test/src/org/apache/qpid/basic/TextMessageTest.java index a4e2cfd2a1..4f4bbb3028 100644 --- a/java/client/test/src/org/apache/qpid/basic/TextMessageTest.java +++ b/java/client/test/src/org/apache/qpid/basic/TextMessageTest.java @@ -23,7 +23,6 @@ import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQSession; import org.apache.qpid.client.message.JMSTextMessage; -import org.apache.qpid.client.testutil.VmOrRemoteTestCase; import org.junit.Before; import org.junit.Test; @@ -35,7 +34,7 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; -public class TextMessageTest extends VmOrRemoteTestCase implements MessageListener +public class TextMessageTest implements MessageListener { private AMQConnection _connection; private AMQDestination _destination; @@ -43,12 +42,12 @@ public class TextMessageTest extends VmOrRemoteTestCase implements MessageListen private final List<JMSTextMessage> received = new ArrayList<JMSTextMessage>(); private final List<String> messages = new ArrayList<String>(); private int _count = 100; + public String _connectionString ="vm://:1"; @Before public void init() throws Exception { - String broker = getConnectionString(); - init(new AMQConnection(broker, "guest", "guest", randomize("Client"), "/test_path")); + init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "/test_path")); } private void init(AMQConnection connection) throws Exception @@ -164,7 +163,7 @@ public class TextMessageTest extends VmOrRemoteTestCase implements MessageListen public static void main(String[] argv) throws Exception { TextMessageTest test = new TextMessageTest(); - test.setConnectionString(argv.length == 0 ? "localhost:5672" : argv[0]); + test._connectionString = argv.length == 0 ? "vm://:1" : argv[0]; test.init(); if (argv.length > 1) test._count = Integer.parseInt(argv[1]); test.test(); diff --git a/java/client/test/src/org/apache/qpid/client/channelclose/ChannelCloseOkTest.java b/java/client/test/src/org/apache/qpid/client/channelclose/ChannelCloseOkTest.java index 678261097f..be384f42d8 100644 --- a/java/client/test/src/org/apache/qpid/client/channelclose/ChannelCloseOkTest.java +++ b/java/client/test/src/org/apache/qpid/client/channelclose/ChannelCloseOkTest.java @@ -20,7 +20,6 @@ package org.apache.qpid.client.channelclose; import junit.framework.JUnit4TestAdapter; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQQueue; -import org.apache.qpid.client.testutil.VmOrRemoteTestCase; import org.apache.log4j.Logger; import org.junit.After; import static org.junit.Assert.assertEquals; @@ -45,7 +44,7 @@ import java.util.List; * 3. Since client does not have an exception listener, currently all sessions are * closed. */ -public class ChannelCloseOkTest extends VmOrRemoteTestCase +public class ChannelCloseOkTest { private Connection _connection; private Destination _destination1; @@ -56,11 +55,12 @@ public class ChannelCloseOkTest extends VmOrRemoteTestCase private final List<Message> _received2 = new ArrayList<Message>(); private final static Logger _log = Logger.getLogger(ChannelCloseOkTest.class); + public String _connectionString = "vm://:1"; @Before public void init() throws Exception { - _connection = new AMQConnection(getConnectionString(), "guest", "guest", randomize("Client"), "/test_path"); + _connection = new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "/test_path"); _destination1 = new AMQQueue("q1", true); _destination2 = new AMQQueue("q2", true); _session1 = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); diff --git a/java/client/test/src/org/apache/qpid/client/message/ObjectMessageTest.java b/java/client/test/src/org/apache/qpid/client/message/ObjectMessageTest.java index 30eb115a7d..a5e1a0d558 100644 --- a/java/client/test/src/org/apache/qpid/client/message/ObjectMessageTest.java +++ b/java/client/test/src/org/apache/qpid/client/message/ObjectMessageTest.java @@ -21,6 +21,8 @@ import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.AMQSession; +import org.junit.Before; +import org.junit.Test; import javax.jms.MessageListener; import javax.jms.MessageProducer; @@ -31,6 +33,8 @@ import java.io.Serializable; import java.util.HashMap; import java.util.ArrayList; +import junit.framework.JUnit4TestAdapter; + public class ObjectMessageTest implements MessageListener { private final AMQConnection connection; @@ -181,7 +185,7 @@ public class ObjectMessageTest implements MessageListener public static void main(String[] argv) throws Exception { - String broker = argv.length > 0 ? argv[0] : "localhost:5672"; + String broker = argv.length > 0 ? argv[0] : "vm://:1"; if("-help".equals(broker)) { System.out.println("Usage: <broker>"); @@ -244,4 +248,5 @@ public class ObjectMessageTest implements MessageListener { return in + System.currentTimeMillis(); } + } diff --git a/java/client/test/src/org/apache/qpid/client/testutil/VmOrRemoteTestCase.java b/java/client/test/src/org/apache/qpid/client/testutil/VmOrRemoteTestCase.java deleted file mode 100644 index 536f93a387..0000000000 --- a/java/client/test/src/org/apache/qpid/client/testutil/VmOrRemoteTestCase.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -package org.apache.qpid.client.testutil; - -import org.apache.qpid.vmbroker.VmPipeBroker; -import org.junit.After; -import org.junit.Before; - -public class VmOrRemoteTestCase -{ - String _connectionString = "vm:1"; - - VmPipeBroker _vmBroker; - - public boolean isVm() { - return "vm:1".equals(_connectionString); - } - - public void setConnectionString(final String connectionString) { - this._connectionString = connectionString; - } - - public String getConnectionString() - { - return _connectionString; - } - - @Before - public void startVmBroker() throws Exception { - if (isVm()) { - _vmBroker = new VmPipeBroker(); - _vmBroker.initialiseBroker(); - } - } - - @After - public void stopVmBroker() { - _vmBroker.killBroker(); - } - -} diff --git a/java/client/test/src/org/apache/qpid/cluster/Client.java b/java/client/test/src/org/apache/qpid/cluster/Client.java index 9156d08060..1d12f157b9 100644 --- a/java/client/test/src/org/apache/qpid/cluster/Client.java +++ b/java/client/test/src/org/apache/qpid/cluster/Client.java @@ -117,6 +117,8 @@ public class Client public static void main(String[] argv) throws AMQException, JMSException, InterruptedException, URLSyntaxException { //assume args describe the set of brokers to try - new Client(new AMQConnection(argv[0], "guest", "guest", argv[1], "/test"), argv[1]); + + String clientName = argv.length > 1 ? argv[1] : "testClient"; + new Client(new AMQConnection(argv.length > 0 ? argv[0] : "vm://:1", "guest", "guest", clientName, "/test"), clientName); } } diff --git a/java/client/test/src/org/apache/qpid/connection/TestManyConnections.java b/java/client/test/src/org/apache/qpid/connection/TestManyConnections.java index 8552deb8fc..4456037c2e 100644 --- a/java/client/test/src/org/apache/qpid/connection/TestManyConnections.java +++ b/java/client/test/src/org/apache/qpid/connection/TestManyConnections.java @@ -21,11 +21,10 @@ import junit.framework.JUnit4TestAdapter; import org.apache.qpid.AMQException; import org.apache.qpid.url.URLSyntaxException; import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.testutil.VmOrRemoteTestCase; import org.apache.log4j.Logger; import org.junit.Test; -public class TestManyConnections extends VmOrRemoteTestCase +public class TestManyConnections { private static final Logger _log = Logger.getLogger(TestManyConnections.class); @@ -44,7 +43,7 @@ public class TestManyConnections extends VmOrRemoteTestCase long startTime = System.currentTimeMillis(); for (int i = 0; i < count; i++) { - createConnection(i, "tcp://foo", "myClient" + i, "guest", "guest", "/test"); + createConnection(i, "vm://:1", "myClient" + i, "guest", "guest", "/test"); } long endTime = System.currentTimeMillis(); _log.info("Time to create " + count + " connections: " + (endTime - startTime) + diff --git a/java/client/test/src/org/apache/qpid/failover/FailoverBrokerTester.java b/java/client/test/src/org/apache/qpid/failover/FailoverBrokerTester.java new file mode 100644 index 0000000000..6f5c5021fa --- /dev/null +++ b/java/client/test/src/org/apache/qpid/failover/FailoverBrokerTester.java @@ -0,0 +1,270 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.failover; + +import org.apache.log4j.Logger; +import org.apache.qpid.client.transport.TransportConnection; +import org.apache.qpid.vmbroker.AMQVMBrokerCreationException; + +public class FailoverBrokerTester implements Runnable +{ + private static final Logger _logger = Logger.getLogger(FailoverBrokerTester.class); + + private int[] _brokers; + private int[] _brokersKilling; + private long _delayBeforeKillingStart; + private long _delayBetweenCullings; + private long _delayBetweenRecreates; + private boolean _recreateBrokers; + private long _delayBeforeReCreationStart; + + private volatile boolean RUNNING; + + + /** + * An InVM Broker Tester. Creates then kills VM brokers to allow failover testing. + * + * @param brokerCount The number of brokers to create + * @param delay The delay before and between broker killings + */ + public FailoverBrokerTester(int brokerCount, long delay) + { + this(brokerCount, delay, delay, false, 0, 0); + } + + /** + * An InVM Broker Tester. Creates then kills VM brokers to allow failover testing. + * + * @param brokerCount The number of brokers + * @param delayBeforeKillingStart + * @param delayBetweenCullings + * @param recreateBrokers + * @param delayBeforeReCreationStart + * @param delayBetweenRecreates + */ + public FailoverBrokerTester(int brokerCount, long delayBeforeKillingStart, + long delayBetweenCullings, boolean recreateBrokers, + long delayBeforeReCreationStart, long delayBetweenRecreates) + { + int[] brokers = new int[brokerCount]; + + for (int n = 0; n < brokerCount; n++) + { + brokers[n] = n + 1; + } + _brokersKilling = _brokers = brokers; + _recreateBrokers = recreateBrokers; + _delayBeforeKillingStart = delayBeforeKillingStart; + _delayBetweenCullings = delayBetweenCullings; + _delayBetweenRecreates = delayBetweenRecreates; + _delayBeforeReCreationStart = delayBeforeReCreationStart; + + createWorld(); + } + + /** + * An InVM Broker Tester. Creates then kills VM brokers to allow failover testing. + * + * @param brokerArray Array for broker creation and killing order + * @param delayBeforeKillingStart + * @param delayBetweenCullings + * @param recreateBrokers + * @param delayBeforeReCreationStart + * @param delayBetweenRecreates + */ + public FailoverBrokerTester(int[] brokerArray, long delayBeforeKillingStart, + long delayBetweenCullings, boolean recreateBrokers, + long delayBeforeReCreationStart, long delayBetweenRecreates) + { + _brokersKilling = _brokers = brokerArray; + _recreateBrokers = recreateBrokers; + _delayBeforeKillingStart = delayBeforeKillingStart; + _delayBetweenCullings = delayBetweenCullings; + _delayBetweenRecreates = delayBetweenRecreates; + _delayBeforeReCreationStart = delayBeforeReCreationStart; + + createWorld(); + } + + /** + * An InVM Broker Tester. Creates then kills VM brokers to allow failover testing. + * + * @param brokerCreateOrder Array for broker creation order + * @param brokerKillOrder Array for broker killing order + * @param delayBeforeKillingStart + * @param delayBetweenCullings + * @param recreateBrokers + * @param delayBeforeReCreationStart + * @param delayBetweenRecreates + */ + public FailoverBrokerTester(int[] brokerCreateOrder, int[] brokerKillOrder, long delayBeforeKillingStart, + long delayBetweenCullings, boolean recreateBrokers, + long delayBeforeReCreationStart, long delayBetweenRecreates) + { + _brokers = brokerCreateOrder; + _brokersKilling = brokerKillOrder; + _recreateBrokers = recreateBrokers; + _delayBeforeKillingStart = delayBeforeKillingStart; + _delayBetweenCullings = delayBetweenCullings; + _delayBetweenRecreates = delayBetweenRecreates; + _delayBeforeReCreationStart = delayBeforeReCreationStart; + + createWorld(); + } + + private void createWorld() + { + System.setProperty("amqj.NoAutoCreateVMBroker", "true"); + + genesis(); + + Thread brokerGod = new Thread(this); + brokerGod.setName("Broker God"); + brokerGod.start(); + } + + + private void genesis() + { + _logger.info("Creating " + _brokers.length + " VM Brokers."); + for (int count = 0; count < _brokers.length; count++) + { + try + { + TransportConnection.createVMBroker(_brokers[count]); + } + catch (AMQVMBrokerCreationException e) + { + ; + } + } + } + + public void run() + { + + RUNNING = true; + try + { + _logger.info("Sleeping before culling starts."); + Thread.sleep(_delayBeforeKillingStart); + } + catch (InterruptedException e) + { + _logger.info("Interupted sleeping before killing starts."); + } + + Thread brokerGod = new Thread(new BrokerDestroyer()); + brokerGod.setName("Broker Destroyer"); + brokerGod.start(); + + if (_recreateBrokers) + { + try + { + _logger.info("Sleeping before recreation starts."); + Thread.sleep(_delayBeforeReCreationStart - _delayBeforeKillingStart); + } + catch (InterruptedException e) + { + _logger.info("Interupted sleeping before recreation starts."); + } + + brokerGod = new Thread(new BrokerCreator()); + brokerGod.setName("Broker Creator"); + brokerGod.start(); + } + } + + + public void stopTesting() + { + _logger.info("Stopping Broker Tester."); + RUNNING = false; + } + + class BrokerCreator implements Runnable + { + public void run() + { + _logger.info("Created Broker Creator."); + while (RUNNING) + { + for (int count = 0; count < _brokers.length; count++) + { + try + { + _logger.info("Creating Broker:" + _brokers[count]); + TransportConnection.createVMBroker(_brokers[count]); + } + catch (AMQVMBrokerCreationException e) + { + _logger.info("Unable to recreate broker:" + count + ", Port:" + _brokers[count]); + } + try + { + Thread.sleep(_delayBetweenRecreates); + } + catch (InterruptedException e) + { + _logger.info("Interupted between broker recreates."); + } + } + } + _logger.info("Ending Broker Creator."); + } + } + + class BrokerDestroyer implements Runnable + { + public void run() + { + _logger.info("Created Broker Destroyer."); + while (RUNNING) + { + for (int count = 0; count < _brokersKilling.length; count++) + { + _logger.info("Destroying Broker:" + _brokersKilling[count]); + killNextBroker(_brokersKilling[count], _delayBetweenCullings); + } + } + _logger.info("Ending Broker Destroyer."); + } + + private void killNextBroker(int broker, long delay) + { + + //Kill the broker + TransportConnection.killVMBroker(broker); + + //Give the client time to get up and going + try + { + Thread.sleep(delay); + } + catch (InterruptedException e) + { + _logger.info("Sleeping before broker killing was interrupted,"); + } + + + } + } + + +} diff --git a/java/client/test/src/org/apache/qpid/failover/FailoverTxTest.java b/java/client/test/src/org/apache/qpid/failover/FailoverTxTest.java index 573bd2af0a..10ec682bf5 100644 --- a/java/client/test/src/org/apache/qpid/failover/FailoverTxTest.java +++ b/java/client/test/src/org/apache/qpid/failover/FailoverTxTest.java @@ -21,8 +21,10 @@ import org.apache.log4j.Logger; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQConnectionURL; import org.apache.qpid.jms.ConnectionListener; +import org.junit.Assert; import javax.jms.*; +import javax.jms.IllegalStateException; public class FailoverTxTest implements ConnectionListener { @@ -38,7 +40,8 @@ public class FailoverTxTest implements ConnectionListener Session session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination queue = session.createTemporaryQueue(); - session.createConsumer(queue).setMessageListener(new MessageListener() { + session.createConsumer(queue).setMessageListener(new MessageListener() + { public void onMessage(Message message) { try @@ -47,7 +50,7 @@ public class FailoverTxTest implements ConnectionListener } catch (JMSException e) { - error(e); + error(e); } } }); @@ -71,12 +74,15 @@ public class FailoverTxTest implements ConnectionListener TextMessage msg = session.createTextMessage("Tx=" + i + " msg=" + j); _log.info("sending message = " + msg.getText()); producer.send(msg); - try { + try + { Thread.sleep(1000); - } catch (InterruptedException e) { + } + catch (InterruptedException e) + { throw new RuntimeException("Someone interrupted me!", e); } - } + } session.commit(); } } @@ -91,7 +97,7 @@ public class FailoverTxTest implements ConnectionListener { System.out.println("Stopping..."); try - { + { _connection.close(); } catch (JMSException e) @@ -128,19 +134,46 @@ public class FailoverTxTest implements ConnectionListener public static void main(final String[] argv) throws Exception { - try { + int[] creationOrder = {1, 2, 3}; + int[] killingOrder = {1, 2, 3}; + long delayBeforeKillingStart = 2000; + long delayBetweenCullings = 2000; + boolean recreateBrokers = true; + long delayBeforeReCreationStart = 4000; + long delayBetweenRecreates = 3000; + + FailoverBrokerTester tester = new FailoverBrokerTester(creationOrder, killingOrder, delayBeforeKillingStart, delayBetweenCullings, + recreateBrokers, delayBeforeReCreationStart, delayBetweenRecreates); + + try + { final String clientId = "failover" + System.currentTimeMillis(); final String defaultUrl = "amqp://guest:guest@" + clientId + "/test" + - "?brokerlist='tcp://localhost:5672;tcp://localhost:5673'&failover='roundrobin'"; + "?brokerlist='vm://:1;vm://:2;vm://:3'&failover='roundrobin?cyclecount='2''"; System.out.println("url = [" + defaultUrl + "]"); System.out.println("connection url = [" + new AMQConnectionURL(defaultUrl) + "]"); - final String url = argv.length == 0? defaultUrl : argv[0]; + final String url = argv.length == 0 ? defaultUrl : argv[0]; new FailoverTxTest(url); - } catch (Throwable t) { - _log.error("test failed", t); + + } + catch (Throwable t) + { + + if (t instanceof IllegalStateException) + { + t.getMessage().endsWith("has been closed"); + } + else + { + Assert.fail("Unexpected Exception occured:" + t.getMessage()); + } + } + finally + { + tester.stopTesting(); } } } diff --git a/java/client/test/src/org/apache/qpid/forwardall/Combined.java b/java/client/test/src/org/apache/qpid/forwardall/Combined.java index 7f32a4a837..2e32798171 100644 --- a/java/client/test/src/org/apache/qpid/forwardall/Combined.java +++ b/java/client/test/src/org/apache/qpid/forwardall/Combined.java @@ -18,27 +18,32 @@ package org.apache.qpid.forwardall; import junit.framework.JUnit4TestAdapter; -import org.apache.qpid.client.testutil.VmOrRemoteTestCase; import org.junit.Test; +import org.junit.Before; +import org.junit.Assert; +import org.junit.After; +import org.apache.qpid.client.transport.TransportConnection; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.vmbroker.AMQVMBrokerCreationException; /** * Runs the Service's and Client parts of the test in the same process * as the broker */ -public class Combined extends VmOrRemoteTestCase +public class Combined { @Test public void forwardAll() throws Exception { int services = 2; - ServiceCreator.start("tcp://ignore:1234", services); + ServiceCreator.start("vm://:1", services); //give them time to get registered etc. System.out.println("Services started, waiting for them to initialise..."); Thread.sleep(5 * 1000); System.out.println("Starting client..."); - new Client("tcp://ignore:1234", services).waitUntilComplete(); + new Client("vm://:1", services).waitUntilComplete(); System.out.println("Completed successfully!"); } diff --git a/java/client/test/src/org/apache/qpid/requestreply1/VmRequestReply.java b/java/client/test/src/org/apache/qpid/requestreply1/VmRequestReply.java index 34e5aa1727..16ec4e2d73 100644 --- a/java/client/test/src/org/apache/qpid/requestreply1/VmRequestReply.java +++ b/java/client/test/src/org/apache/qpid/requestreply1/VmRequestReply.java @@ -17,23 +17,46 @@ */ package org.apache.qpid.requestreply1; -import org.apache.qpid.vmbroker.VmPipeBroker; +import org.apache.qpid.vmbroker.AMQVMBrokerCreationException; +import org.apache.qpid.client.transport.TransportConnection; import org.junit.Test; +import org.junit.Before; +import org.junit.Assert; +import org.junit.After; import org.apache.log4j.Logger; import junit.framework.JUnit4TestAdapter; -public class VmRequestReply extends VmPipeBroker +public class VmRequestReply { private static final Logger _logger = Logger.getLogger(VmRequestReply.class); + @Before + public void startVmBrokers() + { + try + { + TransportConnection.createVMBroker(1); + } + catch (AMQVMBrokerCreationException e) + { + Assert.fail("Unable to create VM Broker"); + } + } + + @After + public void stopVmBrokers() + { + TransportConnection.killVMBroker(1); + } + @Test public void simpleClient() throws Exception { - ServiceProvidingClient serviceProvider = new ServiceProvidingClient("tcp://foo:123", "guest", "guest", + ServiceProvidingClient serviceProvider = new ServiceProvidingClient("vm://:1", "guest", "guest", "serviceProvidingClient", "/test", "serviceQ"); - ServiceRequestingClient serviceRequester = new ServiceRequestingClient("tcp://foo:123", "myClient", "guest", "guest", + ServiceRequestingClient serviceRequester = new ServiceRequestingClient("vm://:1", "myClient", "guest", "guest", "/test", "serviceQ", 5000, 512); serviceProvider.run(); @@ -58,7 +81,6 @@ public class VmRequestReply extends VmPipeBroker VmRequestReply rr = new VmRequestReply(); try { - rr.initialiseBroker(); rr.simpleClient(); } catch (Exception e) diff --git a/java/client/test/src/org/apache/qpid/topic/DurableSubscriptionTest.java b/java/client/test/src/org/apache/qpid/topic/DurableSubscriptionTest.java index cec6a72fea..85ecd3398c 100644 --- a/java/client/test/src/org/apache/qpid/topic/DurableSubscriptionTest.java +++ b/java/client/test/src/org/apache/qpid/topic/DurableSubscriptionTest.java @@ -20,13 +20,15 @@ package org.apache.qpid.topic; import junit.framework.JUnit4TestAdapter; import org.junit.Assert; import org.junit.Test; +import org.junit.Before; +import org.junit.After; import org.apache.qpid.AMQException; +import org.apache.qpid.vmbroker.AMQVMBrokerCreationException; import org.apache.qpid.url.URLSyntaxException; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQSession; import org.apache.qpid.client.AMQTopic; -import org.apache.qpid.client.testutil.VmOrRemoteTestCase; -import org.apache.qpid.vmbroker.VmPipeBroker; +import org.apache.qpid.client.transport.TransportConnection; import javax.jms.JMSException; import javax.jms.Message; @@ -36,13 +38,33 @@ import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.TopicSubscriber; -public class DurableSubscriptionTest extends VmOrRemoteTestCase +public class DurableSubscriptionTest { + + @Before + public void startVmBrokers() + { + try + { + TransportConnection.createVMBroker(1); + } + catch (AMQVMBrokerCreationException e) + { + Assert.fail("Unable to create VM Broker"); + } + } + + @After + public void stopVmBrokers() + { + TransportConnection.killVMBroker(1); + } + @Test public void unsubscribe() throws AMQException, JMSException, URLSyntaxException { AMQTopic topic = new AMQTopic("MyTopic"); - AMQConnection con = new AMQConnection("vm:1", "guest", "guest", "test", "/test"); + AMQConnection con = new AMQConnection("vm://:1", "guest", "guest", "test", "/test"); Session session1 = con.createSession(false, AMQSession.NO_ACKNOWLEDGE); MessageConsumer consumer1 = session1.createConsumer(topic); MessageProducer producer = session1.createProducer(topic); @@ -84,7 +106,7 @@ public class DurableSubscriptionTest extends VmOrRemoteTestCase public void durability() throws AMQException, JMSException, URLSyntaxException { AMQTopic topic = new AMQTopic("MyTopic"); - AMQConnection con = new AMQConnection("vm:1", "guest", "guest", "test", "/test"); + AMQConnection con = new AMQConnection("vm://:1", "guest", "guest", "test", "/test"); Session session1 = con.createSession(false, AMQSession.NO_ACKNOWLEDGE); MessageConsumer consumer1 = session1.createConsumer(topic); MessageProducer producer = session1.createProducer(topic); diff --git a/java/client/test/src/org/apache/qpid/transacted/TransactedTest.java b/java/client/test/src/org/apache/qpid/transacted/TransactedTest.java index cb5ea4335d..9cf767436a 100644 --- a/java/client/test/src/org/apache/qpid/transacted/TransactedTest.java +++ b/java/client/test/src/org/apache/qpid/transacted/TransactedTest.java @@ -21,16 +21,18 @@ import junit.framework.JUnit4TestAdapter; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQSession; -import org.apache.qpid.client.testutil.VmOrRemoteTestCase; +import org.apache.qpid.client.transport.TransportConnection; +import org.apache.qpid.vmbroker.AMQVMBrokerCreationException; import org.junit.After; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import org.junit.Before; import org.junit.Test; +import org.junit.Assert; import javax.jms.*; -public class TransactedTest extends VmOrRemoteTestCase +public class TransactedTest { private AMQQueue queue1; private AMQQueue queue2; @@ -52,28 +54,36 @@ public class TransactedTest extends VmOrRemoteTestCase @Before public void setup() throws Exception { + try + { + TransportConnection.createVMBroker(1); + } + catch (AMQVMBrokerCreationException e) + { + Assert.fail("Unable to create VM Broker"); + } + queue1 = new AMQQueue("Q1", false); queue2 = new AMQQueue("Q2", false); - con = new AMQConnection("localhost:5672", "guest", "guest", "TransactedTest", "/test"); + con = new AMQConnection("vm://:1", "guest", "guest", "TransactedTest", "/test"); session = con.createSession(true, 0); consumer = session.createConsumer(queue1); producer = session.createProducer(queue2); con.start(); - prepCon = new AMQConnection("localhost:5672", "guest", "guest", "PrepConnection", "/test"); + prepCon = new AMQConnection("vm://:1", "guest", "guest", "PrepConnection", "/test"); prepSession = prepCon.createSession(false, AMQSession.NO_ACKNOWLEDGE); prepProducer = prepSession.createProducer(queue1); prepCon.start(); - //add some messages prepProducer.send(prepSession.createTextMessage("A")); prepProducer.send(prepSession.createTextMessage("B")); prepProducer.send(prepSession.createTextMessage("C")); - testCon = new AMQConnection("localhost:5672", "guest", "guest", "TestConnection", "/test"); + testCon = new AMQConnection("vm://:1", "guest", "guest", "TestConnection", "/test"); testSession = testCon.createSession(false, AMQSession.NO_ACKNOWLEDGE); testConsumer1 = testSession.createConsumer(queue1); testConsumer2 = testSession.createConsumer(queue2); @@ -86,6 +96,8 @@ public class TransactedTest extends VmOrRemoteTestCase con.close(); testCon.close(); prepCon.close(); + + TransportConnection.killVMBroker(1); } @Test diff --git a/java/client/test/src/org/apache/qpid/transport/VmPipeTransportConnection.java b/java/client/test/src/org/apache/qpid/transport/VmPipeTransportConnection.java deleted file mode 100644 index 1fa09d0524..0000000000 --- a/java/client/test/src/org/apache/qpid/transport/VmPipeTransportConnection.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -package org.apache.qpid.transport; - -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.protocol.AMQProtocolHandler; -import org.apache.qpid.client.transport.ITransportConnection; -import org.apache.qpid.pool.PoolingFilter; -import org.apache.qpid.pool.ReferenceCountingExecutorService; -import org.apache.qpid.jms.BrokerDetails; -import org.apache.log4j.Logger; -import org.apache.mina.common.ConnectFuture; -import org.apache.mina.common.IoServiceConfig; -import org.apache.mina.transport.vmpipe.VmPipeAddress; -import org.apache.mina.transport.vmpipe.VmPipeConnector; - -import java.io.IOException; - -public class VmPipeTransportConnection implements ITransportConnection -{ - private static final Logger _logger = Logger.getLogger(VmPipeTransportConnection.class); - - public void connect(AMQProtocolHandler protocolHandler, BrokerDetails brokerDetail) throws IOException - { - final VmPipeConnector ioConnector = new VmPipeConnector(); - final IoServiceConfig cfg = ioConnector.getDefaultConfig(); - ReferenceCountingExecutorService executorService = ReferenceCountingExecutorService.getInstance(); - PoolingFilter asyncRead = new PoolingFilter(executorService, PoolingFilter.READ_EVENTS, - "AsynchronousReadFilter"); - cfg.getFilterChain().addFirst("AsynchronousReadFilter", asyncRead); - PoolingFilter asyncWrite = new PoolingFilter(executorService, PoolingFilter.WRITE_EVENTS, - "AsynchronousWriteFilter"); - cfg.getFilterChain().addLast("AsynchronousWriteFilter", asyncWrite); - - _logger.info("Ignoring broker connection details: " + brokerDetail); - final VmPipeAddress address = new VmPipeAddress(1); - _logger.info("Attempting connection to " + address); - ConnectFuture future = ioConnector.connect(address, protocolHandler); - // wait for connection to complete - future.join(); - // we call getSession which throws an IOException if there has been an error connecting - future.getSession(); - } -} diff --git a/java/client/test/src/org/apache/qpid/vmbroker/VmPipeBroker.java b/java/client/test/src/org/apache/qpid/vmbroker/VmPipeBroker.java deleted file mode 100644 index d86502b609..0000000000 --- a/java/client/test/src/org/apache/qpid/vmbroker/VmPipeBroker.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -package org.apache.qpid.vmbroker; - -import org.apache.qpid.client.transport.TransportConnection; -import org.apache.qpid.pool.ReadWriteThreadModel; -import org.apache.qpid.server.protocol.AMQPProtocolProvider; -import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.util.NullApplicationRegistry; -import org.apache.qpid.transport.VmPipeTransportConnection; -import org.apache.commons.configuration.ConfigurationException; -import org.apache.log4j.Logger; -import org.apache.mina.common.IoAcceptor; -import org.apache.mina.common.IoServiceConfig; -import org.apache.mina.transport.vmpipe.VmPipeAcceptor; -import org.apache.mina.transport.vmpipe.VmPipeAddress; - -/** - * This class is a useful base class when you want to run a test where the - * broker and the client(s) run in the same VM. - * - * Once the VmPipeBroker is initialise, and the client transport - * is overridden (ignores any connection strings) and connects to this - * in-VM broker instead (rather than using TCP/IP sockets). This means - * that clients will run unmodified. - */ -public class VmPipeBroker -{ - private static final Logger _logger = Logger.getLogger(VmPipeBroker.class); - - private IoAcceptor _acceptor; - - public void initialiseBroker() throws Exception - { - try - { - ApplicationRegistry.initialise(new NullApplicationRegistry()); - } - catch (ConfigurationException e) - { - _logger.error("Error configuring application: " + e, e); - throw e; - } - - _acceptor = new VmPipeAcceptor(); - IoServiceConfig config = _acceptor.getDefaultConfig(); - - config.setThreadModel(new ReadWriteThreadModel()); - _acceptor.bind(new VmPipeAddress(1), - new AMQPProtocolProvider().getHandler()); - - _logger.info("InVM Qpid.AMQP listening on port " + 1); - TransportConnection.setInstance(new VmPipeTransportConnection()); - } - - public void killBroker() - { - _acceptor.unbindAll(); - _acceptor = null; - } -} |