diff options
Diffstat (limited to 'qpid/java/systests/src/main/java')
55 files changed, 1774 insertions, 3696 deletions
diff --git a/qpid/java/systests/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeIdleStatusChecker.java b/qpid/java/systests/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeIdleStatusChecker.java new file mode 100644 index 0000000000..5323ad28bf --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeIdleStatusChecker.java @@ -0,0 +1,125 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.mina.transport.vmpipe.support; + +import org.apache.mina.common.IdleStatus; + +import java.util.HashMap; +import java.util.IdentityHashMap; +import java.util.Iterator; +import java.util.Map; + +/** + * This file is a patch to override MINA, because of the IdentityHashMap bug. Workaround to be supplied in MINA 1.0.7. + * This patched file will be removed once upgraded onto a newer MINA. + * + * Dectects idle sessions and fires <tt>sessionIdle</tt> events to them. + * + * @author The Apache Directory Project (mina-dev@directory.apache.org) + */ +public class VmPipeIdleStatusChecker +{ + private static final VmPipeIdleStatusChecker INSTANCE = new VmPipeIdleStatusChecker(); + + public static VmPipeIdleStatusChecker getInstance() + { + return INSTANCE; + } + + private final Map sessions = new HashMap(); // will use as a set + + private final Worker worker = new Worker(); + + private VmPipeIdleStatusChecker() + { + worker.start(); + } + + public void addSession(VmPipeSessionImpl session) + { + synchronized (sessions) + { + sessions.put(session, session); + } + } + + private class Worker extends Thread + { + private Worker() + { + super("VmPipeIdleStatusChecker"); + setDaemon(true); + } + + public void run() + { + for (;;) + { + try + { + Thread.sleep(1000); + } + catch (InterruptedException e) + { } + + long currentTime = System.currentTimeMillis(); + + synchronized (sessions) + { + Iterator it = sessions.keySet().iterator(); + while (it.hasNext()) + { + VmPipeSessionImpl session = (VmPipeSessionImpl) it.next(); + if (!session.isConnected()) + { + it.remove(); + } + else + { + notifyIdleSession(session, currentTime); + } + } + } + } + } + } + + private void notifyIdleSession(VmPipeSessionImpl session, long currentTime) + { + notifyIdleSession0(session, currentTime, session.getIdleTimeInMillis(IdleStatus.BOTH_IDLE), IdleStatus.BOTH_IDLE, + Math.max(session.getLastIoTime(), session.getLastIdleTime(IdleStatus.BOTH_IDLE))); + notifyIdleSession0(session, currentTime, session.getIdleTimeInMillis(IdleStatus.READER_IDLE), IdleStatus.READER_IDLE, + Math.max(session.getLastReadTime(), session.getLastIdleTime(IdleStatus.READER_IDLE))); + notifyIdleSession0(session, currentTime, session.getIdleTimeInMillis(IdleStatus.WRITER_IDLE), IdleStatus.WRITER_IDLE, + Math.max(session.getLastWriteTime(), session.getLastIdleTime(IdleStatus.WRITER_IDLE))); + } + + private void notifyIdleSession0(VmPipeSessionImpl session, long currentTime, long idleTime, IdleStatus status, + long lastIoTime) + { + if ((idleTime > 0) && (lastIoTime != 0) && ((currentTime - lastIoTime) >= idleTime)) + { + session.increaseIdleCount(status); + session.getFilterChain().fireSessionIdle(session, status); + } + } + +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/client/AMQQueueDeferredOrderingTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/client/AMQQueueDeferredOrderingTest.java index 986297bfe1..ca10126aa7 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/client/AMQQueueDeferredOrderingTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/client/AMQQueueDeferredOrderingTest.java @@ -30,6 +30,7 @@ import javax.jms.TextMessage; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.test.utils.QpidBrokerTestCase; +import org.apache.qpid.client.transport.TransportConnection; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/client/DispatcherTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/client/DispatcherTest.java index 29a44ecec3..a8a23c2c41 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/client/DispatcherTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/client/DispatcherTest.java @@ -36,6 +36,7 @@ import javax.jms.Session; import javax.naming.Context; import javax.naming.spi.InitialContextFactory; +import org.apache.qpid.client.transport.TransportConnection; import org.apache.qpid.jndi.PropertiesFileInitialContextFactory; import org.apache.qpid.test.utils.QpidBrokerTestCase; import org.slf4j.Logger; diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/client/MultipleJCAProviderRegistrationTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/client/MultipleJCAProviderRegistrationTest.java new file mode 100644 index 0000000000..29b4dd82a7 --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/client/MultipleJCAProviderRegistrationTest.java @@ -0,0 +1,82 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.client; + +import org.apache.qpid.test.utils.QpidBrokerTestCase; +import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.client.transport.TransportConnection; + +import java.io.File; +import java.security.Provider; +import java.security.Security; +import java.util.List; +import java.util.LinkedList; + +/** + * QPID-1394 : Test to ensure that the client can register their custom JCAProviders after the broker to ensure that + * the Qpid custom authentication SASL plugins are used. + */ +public class MultipleJCAProviderRegistrationTest extends QpidBrokerTestCase +{ + + public void setUp() throws Exception + { + _broker = VM; + + super.setUp(); + } + + public void test() throws Exception + { + // Get the providers before connection + Provider[] providers = Security.getProviders(); + + // Force the client to load the providers + getConnection(); + + Provider[] afterConnectionCreation = Security.getProviders(); + + // Find the additions + List additions = new LinkedList(); + for (Provider afterCreation : afterConnectionCreation) + { + boolean found = false; + for (Provider provider : providers) + { + if (provider == afterCreation) + { + found=true; + break; + } + } + + // Record added registies + if (!found) + { + additions.add(afterCreation); + } + } + + assertTrue("Client did not register any providers", additions.size() > 0); + } + +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessageTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessageTest.java index bf96dae02e..004ce5ea8f 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessageTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessageTest.java @@ -27,7 +27,6 @@ import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.UUID; import javax.jms.Connection; import javax.jms.JMSException; @@ -52,7 +51,6 @@ public class AMQPEncodedMapMessageTest extends QpidBrokerTestCase private Session _session; MessageConsumer _consumer; MessageProducer _producer; - UUID myUUID = UUID.randomUUID(); public void setUp() throws Exception { @@ -121,8 +119,7 @@ public class AMQPEncodedMapMessageTest extends QpidBrokerTestCase m.setFloat("Float", Integer.MAX_VALUE + 5000); m.setInt("Int", Integer.MAX_VALUE - 5000); m.setShort("Short", (short)58); - m.setString("String", "Hello"); - m.setObject("uuid", myUUID); + m.setString("String", "Hello"); _producer.send(m); AMQPEncodedMapMessage msg = (AMQPEncodedMapMessage)_consumer.receive(RECEIVE_TIMEOUT); @@ -143,7 +140,6 @@ public class AMQPEncodedMapMessageTest extends QpidBrokerTestCase assertEquals(Integer.MAX_VALUE - 5000,m.getInt("Int")); assertEquals((short)58,m.getShort("Short")); assertEquals("Hello",m.getString("String")); - assertEquals(myUUID,(UUID)m.getObject("uuid")); } @@ -153,11 +149,7 @@ public class AMQPEncodedMapMessageTest extends QpidBrokerTestCase List<Integer> myList = getList(); - m.setObject("List", myList); - - List<UUID> uuidList = new ArrayList<UUID>(); - uuidList.add(myUUID); - m.setObject("uuid-list", uuidList); + m.setObject("List", myList); _producer.send(m); AMQPEncodedMapMessage msg = (AMQPEncodedMapMessage)_consumer.receive(RECEIVE_TIMEOUT); @@ -175,10 +167,6 @@ public class AMQPEncodedMapMessageTest extends QpidBrokerTestCase assertEquals(i,j.intValue()); i++; } - - List<UUID> list2 = (List<UUID>)msg.getObject("uuid-list"); - assertNotNull("UUID List not received",list2); - assertEquals(myUUID,list2.get(0)); } public void testMessageWithMapEntries() throws JMSException @@ -186,12 +174,8 @@ public class AMQPEncodedMapMessageTest extends QpidBrokerTestCase MapMessage m = _session.createMapMessage(); Map<String,String> myMap = getMap(); - m.setObject("Map", myMap); - - Map<String,UUID> uuidMap = new HashMap<String,UUID>(); - uuidMap.put("uuid", myUUID); - m.setObject("uuid-map", uuidMap); + m.setObject("Map", myMap); _producer.send(m); AMQPEncodedMapMessage msg = (AMQPEncodedMapMessage)_consumer.receive(RECEIVE_TIMEOUT); @@ -207,10 +191,6 @@ public class AMQPEncodedMapMessageTest extends QpidBrokerTestCase assertEquals("String" + i,map.get("Key" + i)); i++; } - - Map<String,UUID> map2 = (Map<String,UUID>)msg.getObject("uuid-map"); - assertNotNull("Map not received",map2); - assertEquals(myUUID,map2.get("uuid")); } public void testMessageWithNestedListsAndMaps() throws JMSException diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/jms/xa/XAResourceTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/jms/xa/XAResourceTest.java deleted file mode 100644 index d7ee203fdf..0000000000 --- a/qpid/java/systests/src/main/java/org/apache/qpid/jms/xa/XAResourceTest.java +++ /dev/null @@ -1,116 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.jms.xa; - -import javax.jms.XAConnection; -import javax.jms.XAConnectionFactory; -import javax.jms.XASession; -import javax.transaction.xa.XAResource; - -import org.apache.qpid.client.AMQConnectionFactory; -import org.apache.qpid.jms.ConnectionURL; -import org.apache.qpid.test.utils.QpidBrokerTestCase; -import org.apache.qpid.util.FileUtils; - -public class XAResourceTest extends QpidBrokerTestCase -{ - - private static final String FACTORY_NAME = "default"; - private static final String ALT_FACTORY_NAME = "connection2"; - - /* - * Test with multiple XAResources originating from the same connection factory. XAResource(s) will be equal, - * as they originate from the same session. - */ - public void testIsSameRMSingleCF() throws Exception - { - XAConnectionFactory factory = getConnectionFactory(FACTORY_NAME); - XAConnection conn = factory.createXAConnection(); - XASession session = conn.createXASession(); - XAResource xaResource1 = session.getXAResource(); - XAResource xaResource2 = session.getXAResource(); - - assertEquals("XAResource objects not equal", xaResource1, xaResource2); - assertTrue("isSameRM not true for identical objects", xaResource1.isSameRM(xaResource2)); - - session.close(); - conn.close(); - } - - /* - * Test with multiple XAResources originating from different connection factory's and different sessions. XAResources will not be - * equal as they do not originate from the same session. As the UUID from the broker will be the same, isSameRM will be true. - * - */ - public void testIsSameRMMultiCF() throws Exception - { - startBroker(FAILING_PORT); - ConnectionURL url = getConnectionFactory(FACTORY_NAME).getConnectionURL(); - XAConnectionFactory factory = new AMQConnectionFactory(url); - XAConnectionFactory factory2 = new AMQConnectionFactory(url); - XAConnectionFactory factory3 = getConnectionFactory(ALT_FACTORY_NAME); - - XAConnection conn = factory.createXAConnection(); - XAConnection conn2 = factory2.createXAConnection(); - XAConnection conn3 = factory3.createXAConnection(); - - XASession session = conn.createXASession(); - XASession session2 = conn2.createXASession(); - XASession session3 = conn3.createXASession(); - - XAResource xaResource1 = session.getXAResource(); - XAResource xaResource2 = session2.getXAResource(); - XAResource xaResource3 = session3.getXAResource(); - - assertFalse("XAResource objects should not be equal", xaResource1.equals(xaResource2)); - assertTrue("isSameRM not true for identical objects", xaResource1.isSameRM(xaResource2)); - assertFalse("isSameRM true for XA Resources created by two different brokers", xaResource1.isSameRM(xaResource3)); - - conn.close(); - conn2.close(); - conn3.close(); - } - - @Override - public void stopBroker(int port) throws Exception - { - if (isBrokerPresent(port)) - { - super.stopBroker(port); - } - } - - @Override - public void tearDown() throws Exception - { - try - { - super.tearDown(); - } - finally - { - // Ensure we shutdown any secondary brokers - stopBroker(FAILING_PORT); - FileUtils.deleteDirectory(System.getProperty("QPID_WORK") + "/" + getFailingPort()); - } - } - -} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageConnectionStatisticsTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageConnectionStatisticsTest.java deleted file mode 100644 index 9839c6e475..0000000000 --- a/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageConnectionStatisticsTest.java +++ /dev/null @@ -1,102 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.management.jmx; - -import java.util.ArrayList; -import java.util.List; - -import javax.jms.Connection; - -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.management.common.mbeans.ManagedBroker; -import org.apache.qpid.management.common.mbeans.ManagedConnection; - -/** - * Test enabling generation of message statistics on a per-connection basis. - */ -public class MessageConnectionStatisticsTest extends MessageStatisticsTestCase -{ - public void configureStatistics() throws Exception - { - // no statistics generation configured - } - - /** - * Test statistics on a single connection - */ - public void testEnablingStatisticsPerConnection() throws Exception - { - ManagedBroker vhost = _jmxUtils.getManagedBroker("test"); - - sendUsing(_test, 5, 200); - Thread.sleep(1000); - - List<String> addresses = new ArrayList<String>(); - for (ManagedConnection mc : _jmxUtils.getManagedConnections("test")) - { - assertEquals("Incorrect connection total", 0, mc.getTotalMessagesReceived()); - assertEquals("Incorrect connection data", 0, mc.getTotalDataReceived()); - assertFalse("Connection statistics should not be enabled", mc.isStatisticsEnabled()); - - addresses.add(mc.getRemoteAddress()); - } - assertEquals("Incorrect vhost total", 0, vhost.getTotalMessagesReceived()); - assertEquals("Incorrect vhost data", 0, vhost.getTotalDataReceived()); - - Connection test = new AMQConnection(_brokerUrl, USER, USER, "clientid", "test"); - test.start(); - for (ManagedConnection mc : _jmxUtils.getManagedConnections("test")) - { - if (addresses.contains(mc.getRemoteAddress())) - { - continue; - } - mc.setStatisticsEnabled(true); - assertEquals("Incorrect connection total", 0, mc.getTotalMessagesReceived()); - assertEquals("Incorrect connection data", 0, mc.getTotalDataReceived()); - } - - sendUsing(test, 5, 200); - sendUsing(_test, 5, 200); - Thread.sleep(1000); - - for (ManagedConnection mc : _jmxUtils.getManagedConnections("test")) - { - if (addresses.contains(mc.getRemoteAddress())) - { - assertEquals("Incorrect connection total", 0, mc.getTotalMessagesReceived()); - assertEquals("Incorrect connection data", 0, mc.getTotalDataReceived()); - assertFalse("Connection statistics should not be enabled", mc.isStatisticsEnabled()); - } - else - { - assertEquals("Incorrect connection total", 5, mc.getTotalMessagesReceived()); - assertEquals("Incorrect connection data", 1000, mc.getTotalDataReceived()); - assertTrue("Connection statistics should be enabled", mc.isStatisticsEnabled()); - } - } - assertEquals("Incorrect vhost total", 0, vhost.getTotalMessagesReceived()); - assertEquals("Incorrect vhost data", 0, vhost.getTotalDataReceived()); - assertFalse("Vhost statistics should not be enabled", vhost.isStatisticsEnabled()); - - test.close(); - } -} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsConfigurationTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsConfigurationTest.java deleted file mode 100644 index 383c4c00a8..0000000000 --- a/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsConfigurationTest.java +++ /dev/null @@ -1,162 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.management.jmx; - -import org.apache.qpid.management.common.mbeans.ManagedBroker; -import org.apache.qpid.management.common.mbeans.ManagedConnection; - -/** - * Test enabling generation of message statistics on a per-connection basis. - */ -public class MessageStatisticsConfigurationTest extends MessageStatisticsTestCase -{ - public void configureStatistics() throws Exception - { - setConfigurationProperty("statistics.generation.broker", Boolean.toString(getName().contains("Broker"))); - setConfigurationProperty("statistics.generation.virtualhosts", Boolean.toString(getName().contains("Virtualhost"))); - setConfigurationProperty("statistics.generation.connections", Boolean.toString(getName().contains("Connection"))); - } - - /** - * Just broker statistics. - */ - public void testGenerateBrokerStatistics() throws Exception - { - sendUsing(_test, 5, 200); - Thread.sleep(1000); - - for (ManagedConnection mc : _jmxUtils.getManagedConnections("test")) - { - assertEquals("Incorrect connection total", 0, mc.getTotalMessagesReceived()); - assertEquals("Incorrect connection data", 0, mc.getTotalDataReceived()); - assertFalse("Connection statistics should not be enabled", mc.isStatisticsEnabled()); - } - - ManagedBroker vhost = _jmxUtils.getManagedBroker("test"); - assertEquals("Incorrect vhost data", 0, vhost.getTotalMessagesReceived()); - assertEquals("Incorrect vhost data", 0, vhost.getTotalDataReceived()); - assertFalse("Vhost statistics should not be enabled", vhost.isStatisticsEnabled()); - - assertEquals("Incorrect server total messages", 5, _jmxUtils.getServerInformation().getTotalMessagesReceived()); - assertEquals("Incorrect server total data", 1000, _jmxUtils.getServerInformation().getTotalDataReceived()); - assertTrue("Server statistics should be enabled", _jmxUtils.getServerInformation().isStatisticsEnabled()); - } - - /** - * Just virtualhost statistics. - */ - public void testGenerateVirtualhostStatistics() throws Exception - { - sendUsing(_test, 5, 200); - Thread.sleep(1000); - - for (ManagedConnection mc : _jmxUtils.getManagedConnections("test")) - { - assertEquals("Incorrect connection total", 0, mc.getTotalMessagesReceived()); - assertEquals("Incorrect connection data", 0, mc.getTotalDataReceived()); - assertFalse("Connection statistics should not be enabled", mc.isStatisticsEnabled()); - } - - ManagedBroker vhost = _jmxUtils.getManagedBroker("test"); - assertEquals("Incorrect vhost data", 5, vhost.getTotalMessagesReceived()); - assertEquals("Incorrect vhost data", 1000, vhost.getTotalDataReceived()); - assertTrue("Vhost statistics should be enabled", vhost.isStatisticsEnabled()); - - assertEquals("Incorrect server total messages", 0, _jmxUtils.getServerInformation().getTotalMessagesReceived()); - assertEquals("Incorrect server total data", 0, _jmxUtils.getServerInformation().getTotalDataReceived()); - assertFalse("Server statistics should not be enabled", _jmxUtils.getServerInformation().isStatisticsEnabled()); - } - - /** - * Just connection statistics. - */ - public void testGenerateConnectionStatistics() throws Exception - { - sendUsing(_test, 5, 200); - Thread.sleep(1000); - - for (ManagedConnection mc : _jmxUtils.getManagedConnections("test")) - { - assertEquals("Incorrect connection total", 5, mc.getTotalMessagesReceived()); - assertEquals("Incorrect connection data", 1000, mc.getTotalDataReceived()); - assertTrue("Connection statistics should be enabled", mc.isStatisticsEnabled()); - } - - ManagedBroker vhost = _jmxUtils.getManagedBroker("test"); - assertEquals("Incorrect vhost data", 0, vhost.getTotalMessagesReceived()); - assertEquals("Incorrect vhost data", 0, vhost.getTotalDataReceived()); - assertFalse("Vhost statistics should not be enabled", vhost.isStatisticsEnabled()); - - assertEquals("Incorrect server total messages", 0, _jmxUtils.getServerInformation().getTotalMessagesReceived()); - assertEquals("Incorrect server total data", 0, _jmxUtils.getServerInformation().getTotalDataReceived()); - assertFalse("Server statistics should not be enabled", _jmxUtils.getServerInformation().isStatisticsEnabled()); - } - - /** - * Both broker and virtualhost statistics. - */ - public void testGenerateBrokerAndVirtualhostStatistics() throws Exception - { - sendUsing(_test, 5, 200); - Thread.sleep(1000); - - for (ManagedConnection mc : _jmxUtils.getManagedConnections("test")) - { - assertEquals("Incorrect connection total", 0, mc.getTotalMessagesReceived()); - assertEquals("Incorrect connection data", 0, mc.getTotalDataReceived()); - assertFalse("Connection statistics should not be enabled", mc.isStatisticsEnabled()); - } - - ManagedBroker vhost = _jmxUtils.getManagedBroker("test"); - assertEquals("Incorrect vhost data", 5, vhost.getTotalMessagesReceived()); - assertEquals("Incorrect vhost data", 1000, vhost.getTotalDataReceived()); - assertTrue("Vhost statistics should be enabled", vhost.isStatisticsEnabled()); - - assertEquals("Incorrect server total messages", 5, _jmxUtils.getServerInformation().getTotalMessagesReceived()); - assertEquals("Incorrect server total data", 1000, _jmxUtils.getServerInformation().getTotalDataReceived()); - assertTrue("Server statistics should be enabled", _jmxUtils.getServerInformation().isStatisticsEnabled()); - } - - /** - * Broker, virtualhost and connection statistics. - */ - public void testGenerateBrokerVirtualhostAndConnectionStatistics() throws Exception - { - sendUsing(_test, 5, 200); - Thread.sleep(1000); - - for (ManagedConnection mc : _jmxUtils.getManagedConnections("test")) - { - assertEquals("Incorrect connection total", 5, mc.getTotalMessagesReceived()); - assertEquals("Incorrect connection data", 1000, mc.getTotalDataReceived()); - assertTrue("Connection statistics should be enabled", mc.isStatisticsEnabled()); - } - - ManagedBroker vhost = _jmxUtils.getManagedBroker("test"); - assertEquals("Incorrect vhost data", 5, vhost.getTotalMessagesReceived()); - assertEquals("Incorrect vhost data", 1000, vhost.getTotalDataReceived()); - assertTrue("Vhost statistics should be enabled", vhost.isStatisticsEnabled()); - - assertEquals("Incorrect server total messages", 5, _jmxUtils.getServerInformation().getTotalMessagesReceived()); - assertEquals("Incorrect server total data", 1000, _jmxUtils.getServerInformation().getTotalDataReceived()); - assertTrue("Server statistics should be enabled", _jmxUtils.getServerInformation().isStatisticsEnabled()); - } -} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsDeliveryTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsDeliveryTest.java deleted file mode 100644 index e657856d0e..0000000000 --- a/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsDeliveryTest.java +++ /dev/null @@ -1,110 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.management.jmx; - -import java.util.ArrayList; -import java.util.List; - -import javax.jms.Connection; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.Session; - -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.management.common.mbeans.ManagedBroker; -import org.apache.qpid.management.common.mbeans.ManagedConnection; - -/** - * Test statistics for delivery and receipt. - */ -public class MessageStatisticsDeliveryTest extends MessageStatisticsTestCase -{ - public void configureStatistics() throws Exception - { - setConfigurationProperty("statistics.generation.broker", "true"); - setConfigurationProperty("statistics.generation.virtualhosts", "true"); - setConfigurationProperty("statistics.generation.connections", "true"); - } - - public void testDeliveryAndReceiptStatistics() throws Exception - { - ManagedBroker vhost = _jmxUtils.getManagedBroker("test"); - - sendUsing(_test, 5, 200); - Thread.sleep(1000); - - List<String> addresses = new ArrayList<String>(); - for (ManagedConnection mc : _jmxUtils.getManagedConnections("test")) - { - assertEquals("Incorrect connection delivery total", 0, mc.getTotalMessagesDelivered()); - assertEquals("Incorrect connection delivery data", 0, mc.getTotalDataDelivered()); - assertEquals("Incorrect connection receipt total", 5, mc.getTotalMessagesReceived()); - assertEquals("Incorrect connection receipt data", 1000, mc.getTotalDataReceived()); - - addresses.add(mc.getRemoteAddress()); - } - - assertEquals("Incorrect vhost delivery total", 0, vhost.getTotalMessagesDelivered()); - assertEquals("Incorrect vhost delivery data", 0, vhost.getTotalDataDelivered()); - assertEquals("Incorrect vhost receipt total", 5, vhost.getTotalMessagesReceived()); - assertEquals("Incorrect vhost receipt data", 1000, vhost.getTotalDataReceived()); - - Connection test = new AMQConnection(_brokerUrl, USER, USER, "clientid", "test"); - test.start(); - receiveUsing(test, 5); - - for (ManagedConnection mc : _jmxUtils.getManagedConnections("test")) - { - if (addresses.contains(mc.getRemoteAddress())) - { - assertEquals("Incorrect connection delivery total", 0, mc.getTotalMessagesDelivered()); - assertEquals("Incorrect connection delivery data", 0, mc.getTotalDataDelivered()); - assertEquals("Incorrect connection receipt total", 5, mc.getTotalMessagesReceived()); - assertEquals("Incorrect connection receipt data", 1000, mc.getTotalDataReceived()); - } - else - { - assertEquals("Incorrect connection delivery total", 5, mc.getTotalMessagesDelivered()); - assertEquals("Incorrect connection delivery data", 1000, mc.getTotalDataDelivered()); - assertEquals("Incorrect connection receipt total", 0, mc.getTotalMessagesReceived()); - assertEquals("Incorrect connection receipt data", 0, mc.getTotalDataReceived()); - } - } - assertEquals("Incorrect vhost delivery total", 5, vhost.getTotalMessagesDelivered()); - assertEquals("Incorrect vhost delivery data", 1000, vhost.getTotalDataDelivered()); - assertEquals("Incorrect vhost receipt total", 5, vhost.getTotalMessagesReceived()); - assertEquals("Incorrect vhost receipt data", 1000, vhost.getTotalDataReceived()); - - test.close(); - } - - protected void receiveUsing(Connection con, int number) throws Exception - { - Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); - createQueue(session); - MessageConsumer consumer = session.createConsumer(_queue); - for (int i = 0; i < number; i++) - { - Message msg = consumer.receive(100); - assertNotNull("Message " + i + " was not received", msg); - } - } -} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsReportingTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsReportingTest.java deleted file mode 100644 index 180440c0d6..0000000000 --- a/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsReportingTest.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.management.jmx; - -import java.util.List; - -import org.apache.qpid.util.LogMonitor; - -/** - * Test generation of message statistics reporting. - */ -public class MessageStatisticsReportingTest extends MessageStatisticsTestCase -{ - protected LogMonitor _monitor; - - public void configureStatistics() throws Exception - { - setConfigurationProperty("statistics.generation.broker", "true"); - setConfigurationProperty("statistics.generation.virtualhosts", "true"); - - if (getName().equals("testEnabledStatisticsReporting")) - { - setConfigurationProperty("statistics.reporting.period", "10"); - } - - _monitor = new LogMonitor(_outputFile); - } - - /** - * Test enabling reporting. - */ - public void testEnabledStatisticsReporting() throws Exception - { - sendUsing(_test, 10, 100); - sendUsing(_dev, 20, 100); - sendUsing(_local, 15, 100); - - Thread.sleep(10 * 1000); // 15s - - List<String> brokerStatsData = _monitor.findMatches("BRK-1008"); - List<String> brokerStatsMessages = _monitor.findMatches("BRK-1009"); - List<String> vhostStatsData = _monitor.findMatches("VHT-1003"); - List<String> vhostStatsMessages = _monitor.findMatches("VHT-1004"); - - assertEquals("Incorrect number of broker data stats log messages", 2, brokerStatsData.size()); - assertEquals("Incorrect number of broker message stats log messages", 2, brokerStatsMessages.size()); - assertEquals("Incorrect number of virtualhost data stats log messages", 6, vhostStatsData.size()); - assertEquals("Incorrect number of virtualhost message stats log messages", 6, vhostStatsMessages.size()); - } - - /** - * Test not enabling reporting. - */ - public void testNotEnabledStatisticsReporting() throws Exception - { - sendUsing(_test, 10, 100); - sendUsing(_dev, 20, 100); - sendUsing(_local, 15, 100); - - Thread.sleep(10 * 1000); // 15s - - List<String> brokerStatsData = _monitor.findMatches("BRK-1008"); - List<String> brokerStatsMessages = _monitor.findMatches("BRK-1009"); - List<String> vhostStatsData = _monitor.findMatches("VHT-1003"); - List<String> vhostStatsMessages = _monitor.findMatches("VHT-1004"); - - assertEquals("Incorrect number of broker data stats log messages", 0, brokerStatsData.size()); - assertEquals("Incorrect number of broker message stats log messages", 0, brokerStatsMessages.size()); - assertEquals("Incorrect number of virtualhost data stats log messages", 0, vhostStatsData.size()); - assertEquals("Incorrect number of virtualhost message stats log messages", 0, vhostStatsMessages.size()); - } -} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsTest.java deleted file mode 100644 index 824ae41b97..0000000000 --- a/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsTest.java +++ /dev/null @@ -1,218 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.management.jmx; - -import javax.jms.Connection; - -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.management.common.mbeans.ManagedBroker; -import org.apache.qpid.management.common.mbeans.ManagedConnection; - -/** - * Test generation of message statistics. - */ -public class MessageStatisticsTest extends MessageStatisticsTestCase -{ - public void configureStatistics() throws Exception - { - setConfigurationProperty("statistics.generation.broker", "true"); - setConfigurationProperty("statistics.generation.virtualhosts", "true"); - setConfigurationProperty("statistics.generation.connections", "true"); - } - - /** - * Test message totals. - */ - public void testMessageTotals() throws Exception - { - sendUsing(_test, 10, 100); - sendUsing(_dev, 20, 100); - sendUsing(_local, 5, 100); - sendUsing(_local, 5, 100); - sendUsing(_local, 5, 100); - Thread.sleep(2000); - - ManagedBroker test = _jmxUtils.getManagedBroker("test"); - ManagedBroker dev = _jmxUtils.getManagedBroker("development"); - ManagedBroker local = _jmxUtils.getManagedBroker("localhost"); - - if (!isBroker010()) - { - long total = 0; - long data = 0; - for (ManagedConnection mc : _jmxUtils.getAllManagedConnections()) - { - total += mc.getTotalMessagesReceived(); - data += mc.getTotalDataReceived(); - } - assertEquals("Incorrect connection total", 45, total); - assertEquals("Incorrect connection data", 4500, data); - } - assertEquals("Incorrect server total", 45, _jmxUtils.getServerInformation().getTotalMessagesReceived()); - assertEquals("Incorrect server data", 4500, _jmxUtils.getServerInformation().getTotalDataReceived()); - - if (!isBroker010()) - { - long testTotal = 0; - long testData = 0; - for (ManagedConnection mc : _jmxUtils.getManagedConnections("test")) - { - testTotal += mc.getTotalMessagesReceived(); - testData += mc.getTotalDataReceived(); - } - assertEquals("Incorrect test connection total", 10, testTotal); - assertEquals("Incorrect test connection data", 1000, testData); - } - assertEquals("Incorrect test vhost total", 10, test.getTotalMessagesReceived()); - assertEquals("Incorrect test vhost data", 1000, test.getTotalDataReceived()); - - if (!isBroker010()) - { - long devTotal = 0; - long devData = 0; - for (ManagedConnection mc : _jmxUtils.getManagedConnections("development")) - { - devTotal += mc.getTotalMessagesReceived(); - devData += mc.getTotalDataReceived(); - } - assertEquals("Incorrect test connection total", 20, devTotal); - assertEquals("Incorrect test connection data", 2000, devData); - } - assertEquals("Incorrect development total", 20, dev.getTotalMessagesReceived()); - assertEquals("Incorrect development data", 2000, dev.getTotalDataReceived()); - - if (!isBroker010()) - { - long localTotal = 0; - long localData = 0; - for (ManagedConnection mc : _jmxUtils.getManagedConnections("localhost")) - { - localTotal += mc.getTotalMessagesReceived(); - localData += mc.getTotalDataReceived(); - } - assertEquals("Incorrect test connection total", 15, localTotal); - assertEquals("Incorrect test connection data", 1500, localData); - } - assertEquals("Incorrect localhost total", 15, local.getTotalMessagesReceived()); - assertEquals("Incorrect localhost data", 1500, local.getTotalDataReceived()); - } - - /** - * Test message totals when a connection is closed. - */ - public void testMessageTotalsWithClosedConnections() throws Exception - { - Connection temp = new AMQConnection(_brokerUrl, USER, USER, "clientid", "test"); - temp.start(); - - sendUsing(_test, 10, 100); - sendUsing(temp, 10, 100); - sendUsing(_test, 10, 100); - Thread.sleep(2000); - - temp.close(); - - ManagedBroker test = _jmxUtils.getManagedBroker("test"); - - if (!isBroker010()) - { - long total = 0; - long data = 0; - for (ManagedConnection mc : _jmxUtils.getAllManagedConnections()) - { - total += mc.getTotalMessagesReceived(); - data += mc.getTotalDataReceived(); - } - assertEquals("Incorrect active connection total", 20, total); - assertEquals("Incorrect active connection data", 2000, data); - } - assertEquals("Incorrect server total", 30, _jmxUtils.getServerInformation().getTotalMessagesReceived()); - assertEquals("Incorrect server data", 3000, _jmxUtils.getServerInformation().getTotalDataReceived()); - - if (!isBroker010()) - { - long testTotal = 0; - long testData = 0; - for (ManagedConnection mc : _jmxUtils.getManagedConnections("test")) - { - testTotal += mc.getTotalMessagesReceived(); - testData += mc.getTotalDataReceived(); - } - assertEquals("Incorrect test active connection total", 20, testTotal); - assertEquals("Incorrect test active connection data", 20 * 100, testData); - } - assertEquals("Incorrect test vhost total", 30, test.getTotalMessagesReceived()); - assertEquals("Incorrect test vhost data", 30 * 100, test.getTotalDataReceived()); - } - - /** - * Test message peak rate generation. - */ - public void testMessagePeakRates() throws Exception - { - sendUsing(_test, 2, 10); - Thread.sleep(10000); - sendUsing(_dev, 4, 10); - Thread.sleep(10000); - - ManagedBroker test = _jmxUtils.getManagedBroker("test"); - ManagedBroker dev = _jmxUtils.getManagedBroker("development"); - - assertApprox("Incorrect test vhost peak messages", 0.2d, 1.0d, test.getPeakMessageReceiptRate()); - assertApprox("Incorrect test vhost peak data", 0.2d, 10.0d, test.getPeakDataReceiptRate()); - assertApprox("Incorrect dev vhost peak messages", 0.2d, 2.0d, dev.getPeakMessageReceiptRate()); - assertApprox("Incorrect dev vhost peak data", 0.2d, 20.0d, dev.getPeakDataReceiptRate()); - - assertApprox("Incorrect server peak messages", 0.2d, 2.0d, _jmxUtils.getServerInformation().getPeakMessageReceiptRate()); - assertApprox("Incorrect server peak data", 0.2d, 20.0d, _jmxUtils.getServerInformation().getPeakDataReceiptRate()); - } - - /** - * Test message totals when a vhost has its statistics reset - */ - public void testMessageTotalVhostReset() throws Exception - { - sendUsing(_test, 10, 10); - sendUsing(_dev, 10, 10); - Thread.sleep(2000); - - ManagedBroker test = _jmxUtils.getManagedBroker("test"); - ManagedBroker dev = _jmxUtils.getManagedBroker("development"); - - assertEquals("Incorrect test vhost total messages", 10, test.getTotalMessagesReceived()); - assertEquals("Incorrect test vhost total data", 100, test.getTotalDataReceived()); - assertEquals("Incorrect dev vhost total messages", 10, dev.getTotalMessagesReceived()); - assertEquals("Incorrect dev vhost total data", 100, dev.getTotalDataReceived()); - - assertEquals("Incorrect server total messages", 20, _jmxUtils.getServerInformation().getTotalMessagesReceived()); - assertEquals("Incorrect server total data", 200, _jmxUtils.getServerInformation().getTotalDataReceived()); - - test.resetStatistics(); - - assertEquals("Incorrect test vhost total messages", 0, test.getTotalMessagesReceived()); - assertEquals("Incorrect test vhost total data", 0, test.getTotalDataReceived()); - assertEquals("Incorrect dev vhost total messages", 10, dev.getTotalMessagesReceived()); - assertEquals("Incorrect dev vhost total data", 100, dev.getTotalDataReceived()); - - assertEquals("Incorrect server total messages", 20, _jmxUtils.getServerInformation().getTotalMessagesReceived()); - assertEquals("Incorrect server total data", 200, _jmxUtils.getServerInformation().getTotalDataReceived()); - } -} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsTestCase.java b/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsTestCase.java deleted file mode 100644 index a5b3aa283c..0000000000 --- a/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsTestCase.java +++ /dev/null @@ -1,128 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.management.jmx; - -import javax.jms.Connection; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.TextMessage; - -import org.apache.qpid.AMQException; -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.AMQDestination; -import org.apache.qpid.client.AMQQueue; -import org.apache.qpid.client.AMQSession; -import org.apache.qpid.exchange.ExchangeDefaults; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.test.utils.JMXTestUtils; -import org.apache.qpid.test.utils.QpidBrokerTestCase; - -/** - * Test generation of message statistics. - */ -public abstract class MessageStatisticsTestCase extends QpidBrokerTestCase -{ - protected static final String USER = "admin"; - - protected JMXTestUtils _jmxUtils; - protected Connection _test, _dev, _local; - protected String _queueName = "statistics"; - protected Destination _queue; - protected String _brokerUrl; - - @Override - public void setUp() throws Exception - { - _jmxUtils = new JMXTestUtils(this, USER, USER); - _jmxUtils.setUp(); - - configureStatistics(); - - super.setUp(); - - _brokerUrl = getBroker().toString(); - _test = new AMQConnection(_brokerUrl, USER, USER, "clientid", "test"); - _dev = new AMQConnection(_brokerUrl, USER, USER, "clientid", "development"); - _local = new AMQConnection(_brokerUrl, USER, USER, "clientid", "localhost"); - - _test.start(); - _dev.start(); - _local.start(); - - _jmxUtils.open(); - } - - protected void createQueue(Session session) throws AMQException, JMSException - { - _queue = new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, _queueName); - if (!((AMQSession<?,?>) session).isQueueBound((AMQDestination) _queue)) - { - ((AMQSession<?,?>) session).createQueue(new AMQShortString(_queueName), false, true, false, null); - ((AMQSession<?,?>) session).declareAndBind((AMQDestination) new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, _queueName)); - } - } - - - @Override - public void tearDown() throws Exception - { - _jmxUtils.close(); - - _test.close(); - _dev.close(); - _local.close(); - - super.tearDown(); - } - - /** - * Configure statistics generation properties on the broker. - */ - public abstract void configureStatistics() throws Exception; - - protected void sendUsing(Connection con, int number, int size) throws Exception - { - Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); - createQueue(session); - MessageProducer producer = session.createProducer(_queue); - String content = new String(new byte[size]); - TextMessage msg = session.createTextMessage(content); - for (int i = 0; i < number; i++) - { - producer.send(msg); - } - } - - /** - * Asserts that the actual value is within the expected value plus or - * minus the given error. - */ - public void assertApprox(String message, double error, double expected, double actual) - { - double min = expected * (1.0d - error); - double max = expected * (1.0d + error); - String assertion = String.format("%s: expected %f +/- %d%%, actual %f", - message, expected, (int) (error * 100.0d), actual); - assertTrue(assertion, actual > min && actual < max); - } -} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/BrokerStartupTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/BrokerStartupTest.java index 27b4de0a8e..f9227c53ba 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/BrokerStartupTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/BrokerStartupTest.java @@ -70,13 +70,13 @@ public class BrokerStartupTest extends AbstractTestLogging { // This logging startup code only occurs when you run a Java broker, // that broker must be started via Main so not an InVM broker. - if (isJavaBroker() && isExternalBroker() && !isInternalBroker()) + if (isJavaBroker() && isExternalBroker()) { //Remove test Log4j config from the commandline - _brokerCommand = _brokerCommand.substring(0, _brokerCommand.indexOf("-l")); + _broker = _broker.substring(0, _broker.indexOf("-l")); // Add an invalid value - _brokerCommand += " -l invalid"; + _broker += " -l invalid"; // The broker has a built in default log4j configuration set up // so if the the broker cannot load the -l value it will use default diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/configuration/ServerConfigurationFileTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/configuration/ServerConfigurationFileTest.java index 6d379e14d8..d4c550bc08 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/configuration/ServerConfigurationFileTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/configuration/ServerConfigurationFileTest.java @@ -61,6 +61,21 @@ public class ServerConfigurationFileTest extends QpidBrokerTestCase _serverConfig.getConfig().getProperty(property)); } + public void testProtectIOEnabled() throws ConfigurationException + { + validatePropertyDefinedInFile(ServerConfiguration.CONNECTOR_PROTECTIO_ENABLED); + } + + public void testProtectIOReadBufferLimitSize() throws ConfigurationException + { + validatePropertyDefinedInFile(ServerConfiguration.CONNECTOR_PROTECTIO_READ_BUFFER_LIMIT_SIZE); + } + + public void testProtectIOWriteBufferLimitSize() throws ConfigurationException + { + validatePropertyDefinedInFile(ServerConfiguration.CONNECTOR_PROTECTIO_WRITE_BUFFER_LIMIT_SIZE); + } + public void testStatusUpdates() throws ConfigurationException { validatePropertyDefinedInFile(ServerConfiguration.STATUS_UPDATES); diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/MessagingTestConfigProperties.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/MessagingTestConfigProperties.java index 4a92f04b30..2d89d319d7 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/MessagingTestConfigProperties.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/MessagingTestConfigProperties.java @@ -83,9 +83,15 @@ public class MessagingTestConfigProperties /** Holds the name of the default connection factory configuration property. */ public static final String CONNECTION_PROPNAME = "connectionfactory.broker"; + /** Defeins the default connection configuration. */ + public static final String CONNECTION_DEFAULT = "amqp://guest:guest@clientid/?brokerlist='vm://:1'"; + /** Holds the name of the property to get the test broker url from. */ public static final String BROKER_PROPNAME = "qpid.test.broker"; + /** Holds the default broker url for the test. */ + public static final String BROKER_DEFAULT = "vm://:1"; + /** Holds the name of the property to get the test broker virtual path. */ public static final String VIRTUAL_HOST_PROPNAME = "virtualHost"; @@ -268,6 +274,7 @@ public class MessagingTestConfigProperties static { defaults.setPropertyIfNull(INITIAL_CONTEXT_FACTORY_PROPNAME, INITIAL_CONTEXT_FACTORY_DEFAULT); + defaults.setPropertyIfNull(CONNECTION_PROPNAME, CONNECTION_DEFAULT); defaults.setPropertyIfNull(MESSAGE_SIZE_PROPNAME, MESSAGE_SIZE_DEAFULT); defaults.setPropertyIfNull(PUBLISHER_PRODUCER_BIND_PROPNAME, PUBLISHER_PRODUCER_BIND_DEFAULT); defaults.setPropertyIfNull(PUBLISHER_CONSUMER_BIND_PROPNAME, PUBLISHER_CONSUMER_BIND_DEFAULT); @@ -277,6 +284,7 @@ public class MessagingTestConfigProperties defaults.setPropertyIfNull(RECEIVE_DESTINATION_NAME_ROOT_PROPNAME, RECEIVE_DESTINATION_NAME_ROOT_DEFAULT); defaults.setPropertyIfNull(PERSISTENT_MODE_PROPNAME, PERSISTENT_MODE_DEFAULT); defaults.setPropertyIfNull(TRANSACTED_PROPNAME, TRANSACTED_DEFAULT); + defaults.setPropertyIfNull(BROKER_PROPNAME, BROKER_DEFAULT); defaults.setPropertyIfNull(VIRTUAL_HOST_PROPNAME, VIRTUAL_HOST_DEFAULT); defaults.setPropertyIfNull(RATE_PROPNAME, RATE_DEFAULT); defaults.setPropertyIfNull(VERBOSE_PROPNAME, VERBOSE_DEFAULT); diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/failover/FailoverMethodTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/failover/FailoverMethodTest.java index 11db513e00..ec222ff03d 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/failover/FailoverMethodTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/failover/FailoverMethodTest.java @@ -20,39 +20,58 @@ */ package org.apache.qpid.server.failover; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -import javax.jms.ExceptionListener; -import javax.jms.JMSException; +import junit.framework.TestCase; import org.apache.qpid.AMQDisconnectedException; import org.apache.qpid.AMQException; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQConnectionURL; -import org.apache.qpid.test.utils.QpidBrokerTestCase; +import org.apache.qpid.client.transport.TransportConnection; +import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.util.InternalBrokerBaseCase; +import org.apache.qpid.url.URLSyntaxException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class FailoverMethodTest extends QpidBrokerTestCase implements ExceptionListener +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import java.util.concurrent.CountDownLatch; + +public class FailoverMethodTest extends InternalBrokerBaseCase implements ExceptionListener { private CountDownLatch _failoverComplete = new CountDownLatch(1); protected static final Logger _logger = LoggerFactory.getLogger(FailoverMethodTest.class); + @Override + public void createBroker() throws Exception + { + super.createBroker(); + TransportConnection.createVMBroker(ApplicationRegistry.DEFAULT_INSTANCE); + } + @Override + public void stopBroker() + { + TransportConnection.killVMBroker(ApplicationRegistry.DEFAULT_INSTANCE); + super.stopBroker(); + } /** * Test that the round robin method has the correct delays. - * The first connection will work but the localhost connection should fail but the duration it takes + * The first connection to vm://:1 will work but the localhost connection should fail but the duration it takes * to report the failure is what is being tested. * + * @throws URLSyntaxException + * @throws InterruptedException + * @throws JMSException */ - public void testFailoverRoundRobinDelay() throws Exception + public void testFailoverRoundRobinDelay() throws URLSyntaxException, InterruptedException, JMSException { - //note: The first broker has no connect delay and the default 1 retry + //note: The VM broker has no connect delay and the default 1 retry // while the tcp:localhost broker has 3 retries with a 2s connect delay String connectionString = "amqp://guest:guest@/test?brokerlist=" + - "'tcp://:" + getPort() + + "'vm://:" + ApplicationRegistry.DEFAULT_INSTANCE + ";tcp://localhost:5670?connectdelay='2000',retries='3''"; AMQConnectionURL url = new AMQConnectionURL(connectionString); @@ -66,9 +85,7 @@ public class FailoverMethodTest extends QpidBrokerTestCase implements ExceptionL stopBroker(); - _failoverComplete.await(30, TimeUnit.SECONDS); - assertEquals("failoverLatch was not decremented in given timeframe", - 0, _failoverComplete.getCount()); + _failoverComplete.await(); long end = System.currentTimeMillis(); @@ -95,9 +112,10 @@ public class FailoverMethodTest extends QpidBrokerTestCase implements ExceptionL } } - public void testFailoverSingleDelay() throws Exception + public void testFailoverSingleDelay() throws URLSyntaxException, AMQVMBrokerCreationException, + InterruptedException, JMSException { - String connectionString = "amqp://guest:guest@/test?brokerlist='tcp://localhost:" + getPort() + "?connectdelay='2000',retries='3''"; + String connectionString = "amqp://guest:guest@/test?brokerlist='vm://:1?connectdelay='2000',retries='3''"; AMQConnectionURL url = new AMQConnectionURL(connectionString); @@ -110,9 +128,7 @@ public class FailoverMethodTest extends QpidBrokerTestCase implements ExceptionL stopBroker(); - _failoverComplete.await(30, TimeUnit.SECONDS); - assertEquals("failoverLatch was not decremented in given timeframe", - 0, _failoverComplete.getCount()); + _failoverComplete.await(); long end = System.currentTimeMillis(); @@ -144,10 +160,6 @@ public class FailoverMethodTest extends QpidBrokerTestCase implements ExceptionL _logger.debug("Received AMQDisconnectedException"); _failoverComplete.countDown(); } - else - { - _logger.error("Unexpected underlying exception", e.getLinkedException()); - } } /** @@ -156,37 +168,28 @@ public class FailoverMethodTest extends QpidBrokerTestCase implements ExceptionL * * Test validates that there is a connection delay as required on initial * connection. + * + * @throws URLSyntaxException + * @throws AMQVMBrokerCreationException + * @throws InterruptedException + * @throws JMSException */ - public void testNoFailover() throws Exception + public void testNoFailover() throws URLSyntaxException, AMQVMBrokerCreationException, + InterruptedException, JMSException { - if (!isInternalBroker()) - { - // QPID-3359 - // These tests always used to be inVM tests, then QPID-2815, with removal of ivVM, - // converted the test to use QpidBrokerTestCase. However, since then we notice this - // test fails on slower CI boxes. It turns out the test design is *extremely* - // sensitive the length of time the broker takes to start up. - // - // Making the test a same-VM test to temporarily avoid the issue. In long term, test - // needs redesigned to avoid the issue. - return; - } - int CONNECT_DELAY = 2000; - String connectionString = "amqp://guest:guest@/test?brokerlist='tcp://localhost:" + getPort() + "?connectdelay='" + CONNECT_DELAY + "'," + + String connectionString = "amqp://guest:guest@/test?brokerlist='vm://:1?connectdelay='" + CONNECT_DELAY + "'," + "retries='3'',failover='nofailover'"; - AMQConnectionURL url = new AMQConnectionURL(connectionString); - Thread brokerStart = null; try { //Kill initial broker stopBroker(); //Create a thread to start the broker asynchronously - brokerStart = new Thread(new Runnable() + Thread brokerStart = new Thread(new Runnable() { public void run() { @@ -195,7 +198,7 @@ public class FailoverMethodTest extends QpidBrokerTestCase implements ExceptionL //Wait before starting broker // The wait should allow atleast 1 retries to fail before broker is ready Thread.sleep(750); - startBroker(); + createBroker(); } catch (Exception e) { @@ -209,6 +212,7 @@ public class FailoverMethodTest extends QpidBrokerTestCase implements ExceptionL brokerStart.start(); long start = System.currentTimeMillis(); + //Start the connection so it will use the retries AMQConnection connection = new AMQConnection(url, null); @@ -224,16 +228,13 @@ public class FailoverMethodTest extends QpidBrokerTestCase implements ExceptionL //Ensure we collect the brokerStart thread brokerStart.join(); - brokerStart = null; start = System.currentTimeMillis(); //Kill connection stopBroker(); - _failoverComplete.await(30, TimeUnit.SECONDS); - assertEquals("failoverLatch was not decremented in given timeframe", - 0, _failoverComplete.getCount()); + _failoverComplete.await(); end = System.currentTimeMillis(); @@ -248,23 +249,6 @@ public class FailoverMethodTest extends QpidBrokerTestCase implements ExceptionL { fail(e.getMessage()); } - finally - { - // Guard against the case where the broker took too long to start - // and the initial connection failed to be formed. - if (brokerStart != null) - { - brokerStart.join(); - } - } - } - - public void stopBroker(int port) throws Exception - { - if (isBrokerPresent(port)) - { - super.stopBroker(port); - } } } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/AlertingTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/AlertingTest.java index 58b2edfee2..05aaf16af1 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/AlertingTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/AlertingTest.java @@ -176,7 +176,7 @@ public class AlertingTest extends AbstractTestLogging startBroker(); - if (isInternalBroker()) + if (!isExternalBroker()) { assertEquals("Alert Max Msg Count is not correct", 5, ApplicationRegistry.getInstance().getVirtualHostRegistry(). getVirtualHost(VIRTUALHOST).getQueueRegistry().getQueue(new AMQShortString(_destination.getQueueName())). diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/BrokerLoggingTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/BrokerLoggingTest.java index 9155b84365..8fd2c085c3 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/BrokerLoggingTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/BrokerLoggingTest.java @@ -21,8 +21,6 @@ package org.apache.qpid.server.logging; import junit.framework.AssertionFailedError; - -import org.apache.qpid.server.BrokerOptions; import org.apache.qpid.server.Main; import org.apache.qpid.transport.ConnectionException; import org.apache.qpid.util.LogMonitor; @@ -153,12 +151,12 @@ public class BrokerLoggingTest extends AbstractTestLogging { // This logging startup code only occurs when you run a Java broker, // that broker must be started via Main so not an InVM broker. - if (isJavaBroker() && isExternalBroker() && !isInternalBroker()) + if (isJavaBroker() && isExternalBroker()) { String TESTID = "BRK-1007"; //Remove test Log4j config from the commandline - _brokerCommand = _brokerCommand.substring(0, _brokerCommand.indexOf("-l")); + _broker = _broker.substring(0, _broker.indexOf("-l")); startBroker(); @@ -205,7 +203,7 @@ public class BrokerLoggingTest extends AbstractTestLogging 1, findMatches(TESTID).size()); //3 - String defaultLog4j = System.getProperty(QPID_HOME) + "/" + BrokerOptions.DEFAULT_LOG_CONFIG_FILE; + String defaultLog4j = _configFile.getParent() + "/" + Main.DEFAULT_LOG_CONFIG_FILENAME; assertTrue("Log4j file(" + defaultLog4j + ") details not correctly logged:" + getMessageString(log), getMessageString(log).endsWith(defaultLog4j)); @@ -242,11 +240,12 @@ public class BrokerLoggingTest extends AbstractTestLogging */ public void testBrokerStartupCustomLog4j() throws Exception { - // This logging startup code only occurs when you run a Java broker + // This logging startup code only occurs when you run a Java broker, + // that broker must be started via Main so not an InVM broker. if (isJavaBroker() && isExternalBroker()) { // Get custom -l value used during testing for the broker startup - String customLog4j = _brokerCommand.substring(_brokerCommand.indexOf("-l") + 2); + String customLog4j = _broker.substring(_broker.indexOf("-l") + 2); String TESTID = "BRK-1007"; diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ChannelLoggingTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ChannelLoggingTest.java index 1b2ec9c092..02d0d6f334 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ChannelLoggingTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ChannelLoggingTest.java @@ -77,7 +77,7 @@ public class ChannelLoggingTest extends AbstractTestLogging validateMessageID("CHN-1001", log); assertEquals("Incorrect Channel in actor:"+fromActor(log), isBroker010()? 0 : 1, getChannelID(fromActor(log))); - if (!isBroker010()) + if (isBroker08()) { // Wait to ensure that the CHN-1004 message is logged waitForMessage("CHN-1004"); @@ -89,7 +89,7 @@ public class ChannelLoggingTest extends AbstractTestLogging log = getLogMessage(results, 0); // MESSAGE [con:0(guest@anonymous(3273383)/test)/ch:1] CHN-1004 : Prefetch Size (bytes) {0,number} : Count {1,number} validateMessageID("CHN-1004", log); - assertEquals("Incorrect Channel in actor:"+fromActor(log), 1, getChannelID(fromActor(log))); + assertEquals("Incorrect Channel in actor:"+fromActor(log), isBroker010()? 0 : 1, getChannelID(fromActor(log))); assertTrue("Prefetch Count not correct",getMessageString(fromMessage(log)).endsWith("Count "+PREFETCH)); } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ManagementLoggingTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ManagementLoggingTest.java index 9feca7279e..595c0d5f35 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ManagementLoggingTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ManagementLoggingTest.java @@ -76,8 +76,9 @@ public class ManagementLoggingTest extends AbstractTestLogging */ public void testManagementStartupEnabled() throws Exception { - // This test only works on java brokers - if (isJavaBroker()) + // This test only works on external java brokers due to the fact that + // Management is disabled on InVM brokers. + if (isJavaBroker() && isExternalBroker()) { startBrokerAndCreateMonitor(true, false); @@ -129,7 +130,9 @@ public class ManagementLoggingTest extends AbstractTestLogging */ public void testManagementStartupDisabled() throws Exception { - if (isJavaBroker()) + // This test only works on external java brokers due to the fact that + // Management is disabled on InVM brokers. + if (isJavaBroker() && isExternalBroker()) { startBrokerAndCreateMonitor(false, false); @@ -188,7 +191,9 @@ public class ManagementLoggingTest extends AbstractTestLogging */ public void testManagementStartupRMIEntries() throws Exception { - if (isJavaBroker()) + // This test only works on external java brokers due to the fact that + // Management is disabled on InVM brokers. + if (isJavaBroker() && isExternalBroker()) { startBrokerAndCreateMonitor(true, false); @@ -245,7 +250,9 @@ public class ManagementLoggingTest extends AbstractTestLogging */ public void testManagementStartupSSLKeystore() throws Exception { - if (isJavaBroker()) + // This test only works on external java brokers due to the fact that + // Management is disabled on InVM brokers. + if (isJavaBroker() && isExternalBroker()) { startBrokerAndCreateMonitor(true, true); diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/persistent/NoLocalAfterRecoveryTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/persistent/NoLocalAfterRecoveryTest.java index 398c83a8d8..a5aec3edce 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/persistent/NoLocalAfterRecoveryTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/persistent/NoLocalAfterRecoveryTest.java @@ -78,10 +78,21 @@ public class NoLocalAfterRecoveryTest extends QpidBrokerTestCase implements Conn BrokerDetails details = _connectionURL.getBrokerDetails(0); - // This will attempt to failover for 3 seconds. - // Local testing suggests failover takes 2 seconds - details.setProperty(BrokerDetails.OPTIONS_RETRY, "10"); - details.setProperty(BrokerDetails.OPTIONS_CONNECT_DELAY, "500"); + // Due to the problem with SingleServer delaying on all connection + // attempts. So using a high retry value. + if (_broker.equals(VM)) + { + // Local testing suggests InVM restart takes under a second + details.setProperty(BrokerDetails.OPTIONS_RETRY, "5"); + details.setProperty(BrokerDetails.OPTIONS_CONNECT_DELAY, "200"); + } + else + { + // This will attempt to failover for 3 seconds. + // Local testing suggests failover takes 2 seconds + details.setProperty(BrokerDetails.OPTIONS_RETRY, "10"); + details.setProperty(BrokerDetails.OPTIONS_CONNECT_DELAY, "500"); + } super.setUp(); } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MultipleTransactedBatchProducerTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MultipleTransactedBatchProducerTest.java deleted file mode 100644 index 460270e188..0000000000 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MultipleTransactedBatchProducerTest.java +++ /dev/null @@ -1,246 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import java.util.Random; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -import javax.jms.Connection; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.Session; - -import org.apache.log4j.Logger; -import org.apache.qpid.test.utils.QpidBrokerTestCase; - -public class MultipleTransactedBatchProducerTest extends QpidBrokerTestCase -{ - private static final Logger _logger = Logger.getLogger(MultipleTransactedBatchProducerTest.class); - - private static final int MESSAGE_COUNT = 1000; - private static final int BATCH_SIZE = 50; - private static final int NUM_PRODUCERS = 2; - private static final int NUM_CONSUMERS = 3; - private static final Random RANDOM = new Random(); - - private CountDownLatch _receivedLatch; - private String _queueName; - - private volatile String _failMsg; - - public void setUp() throws Exception - { - //debug level logging often makes this test pass artificially, turn the level down to info. - setSystemProperty("amqj.server.logging.level", "INFO"); - _receivedLatch = new CountDownLatch(MESSAGE_COUNT * NUM_PRODUCERS); - setConfigurationProperty("management.enabled", "true"); - super.setUp(); - _queueName = getTestQueueName(); - _failMsg = null; - } - - /** - * When there are multiple producers submitting batches of messages to a given - * queue using transacted sessions, it is highly probable that concurrent - * enqueue() activity will occur and attempt delivery of their message to the - * same subscription. In this scenario it is likely that one of the attempts - * will succeed and the other will result in use of the deliverAsync() method - * to start a queue Runner and ensure delivery of the message. - * - * A defect within the processQueue() method used by the Runner would mean that - * delivery of these messages may not occur, should the Runner stop before all - * messages have been processed. Such a defect was discovered and found to be - * most visible when Selectors are used such that one and only one subscription - * can/will accept any given message, but multiple subscriptions are present, - * and one of the earlier subscriptions receives more messages than the others. - * - * This test is to validate that the processQueue() method is able to correctly - * deliver all of the messages present for asynchronous delivery to subscriptions, - * by utilising multiple batch transacted producers to create the scenario and - * ensure all messages are received by a consumer. - */ - public void testMultipleBatchedProducersWithMultipleConsumersUsingSelectors() throws Exception - { - String selector1 = ("(\"" + _queueName +"\" % " + NUM_CONSUMERS + ") = 0"); - String selector2 = ("(\"" + _queueName +"\" % " + NUM_CONSUMERS + ") = 1"); - String selector3 = ("(\"" + _queueName +"\" % " + NUM_CONSUMERS + ") = 2"); - - //create consumers - Connection conn1 = getConnection(); - conn1.setExceptionListener(new ExceptionHandler("conn1")); - Session sess1 = conn1.createSession(true, Session.SESSION_TRANSACTED); - MessageConsumer cons1 = sess1.createConsumer(sess1.createQueue(_queueName), selector1); - cons1.setMessageListener(new Cons(sess1,"consumer1")); - - Connection conn2 = getConnection(); - conn2.setExceptionListener(new ExceptionHandler("conn2")); - Session sess2 = conn2.createSession(true, Session.SESSION_TRANSACTED); - MessageConsumer cons2 = sess2.createConsumer(sess2.createQueue(_queueName), selector2); - cons2.setMessageListener(new Cons(sess2,"consumer2")); - - Connection conn3 = getConnection(); - conn3.setExceptionListener(new ExceptionHandler("conn3")); - Session sess3 = conn3.createSession(true, Session.SESSION_TRANSACTED); - MessageConsumer cons3 = sess3.createConsumer(sess3.createQueue(_queueName), selector3); - cons3.setMessageListener(new Cons(sess3,"consumer3")); - - conn1.start(); - conn2.start(); - conn3.start(); - - //create producers - Connection connA = getConnection(); - connA.setExceptionListener(new ExceptionHandler("connA")); - Connection connB = getConnection(); - connB.setExceptionListener(new ExceptionHandler("connB")); - Thread producer1 = new Thread(new ProducerThread(connA, _queueName, "producer1")); - Thread producer2 = new Thread(new ProducerThread(connB, _queueName, "producer2")); - - producer1.start(); - Thread.sleep(10); - producer2.start(); - - //await delivery of the messages - boolean result = _receivedLatch.await(75, TimeUnit.SECONDS); - - assertNull("Test failed because: " + String.valueOf(_failMsg), _failMsg); - assertTrue("Some of the messages were not all recieved in the given timeframe, remaining count was: "+_receivedLatch.getCount(), - result); - - } - - @Override - public Message createNextMessage(Session session, int msgCount) throws JMSException - { - Message message = super.createNextMessage(session,msgCount); - - //bias at least 50% of the messages to the first consumers selector because - //the issue presents itself primarily when an earlier subscription completes - //delivery after the later subscriptions - int val; - if (msgCount % 2 == 0) - { - val = 0; - } - else - { - val = RANDOM.nextInt(Integer.MAX_VALUE); - } - - message.setIntProperty(_queueName, val); - - return message; - } - - private class Cons implements MessageListener - { - private Session _sess; - private String _desc; - - public Cons(Session sess, String desc) - { - _sess = sess; - _desc = desc; - } - - public void onMessage(Message message) - { - _receivedLatch.countDown(); - int msgCount = 0; - int msgID = 0; - try - { - msgCount = message.getIntProperty(INDEX); - msgID = message.getIntProperty(_queueName); - } - catch (JMSException e) - { - _logger.error(_desc + " received exception: " + e.getMessage(), e); - failAsyncTest(e.getMessage()); - } - - _logger.info("Consumer received message:"+ msgCount + " with ID: " + msgID); - - try - { - _sess.commit(); - } - catch (JMSException e) - { - _logger.error(_desc + " received exception: " + e.getMessage(), e); - failAsyncTest(e.getMessage()); - } - } - } - - private class ProducerThread implements Runnable - { - private Connection _conn; - private String _dest; - private String _desc; - - public ProducerThread(Connection conn, String dest, String desc) - { - _conn = conn; - _dest = dest; - _desc = desc; - } - - public void run() - { - try - { - Session session = _conn.createSession(true, Session.SESSION_TRANSACTED); - sendMessage(session, session.createQueue(_dest), MESSAGE_COUNT, BATCH_SIZE); - } - catch (Exception e) - { - _logger.error(_desc + " received exception: " + e.getMessage(), e); - failAsyncTest(e.getMessage()); - } - } - } - - private class ExceptionHandler implements javax.jms.ExceptionListener - { - private String _desc; - - public ExceptionHandler(String description) - { - _desc = description; - } - - public void onException(JMSException e) - { - _logger.error(_desc + " received exception: " + e.getMessage(), e); - failAsyncTest(e.getMessage()); - } - } - - private void failAsyncTest(String msg) - { - _logger.error("Failing test because: " + msg); - _failMsg = msg; - } -}
\ No newline at end of file diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityTest.java index 2ce1251eab..6203e8a194 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityTest.java @@ -23,6 +23,7 @@ package org.apache.qpid.server.queue; import junit.framework.TestCase; import junit.framework.Assert; import org.apache.log4j.Logger; +import org.apache.qpid.client.transport.TransportConnection; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQSession; import org.apache.qpid.client.AMQQueue; diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/security/acl/AbstractACLTestCase.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/security/acl/AbstractACLTestCase.java index 7f8f71d965..f845ff1214 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/security/acl/AbstractACLTestCase.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/security/acl/AbstractACLTestCase.java @@ -52,6 +52,7 @@ import org.apache.qpid.url.URLSyntaxException; * * TODO move the pre broker-startup setup method invocation code to {@link QpidBrokerTestCase} * + * @see SimpleACLTest * @see ExternalACLTest * @see ExternalACLFileTest * @see ExternalACLJMXTest @@ -64,7 +65,10 @@ public abstract class AbstractACLTestCase extends QpidBrokerTestCase implements protected CountDownLatch _exceptionReceived; /** Override this to return the name of the configuration XML file. */ - public abstract String getConfig(); + public String getConfig() + { + return "config-systests-acl.xml"; + } /** Override this to setup external ACL files for virtual hosts. */ public List<String> getHostList() diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/security/acl/ExternalACLTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/security/acl/ExternalACLTest.java index d1ba725721..4603cc1862 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/security/acl/ExternalACLTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/security/acl/ExternalACLTest.java @@ -18,620 +18,11 @@ */ package org.apache.qpid.server.security.acl; -import java.io.IOException; import java.util.Arrays; import java.util.List; -import javax.jms.Connection; -import javax.jms.DeliveryMode; -import javax.jms.IllegalStateException; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Queue; -import javax.jms.Session; -import javax.jms.TextMessage; -import javax.jms.Topic; -import javax.jms.TopicSubscriber; -import javax.naming.NamingException; - -import org.apache.qpid.AMQException; -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.AMQSession; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.url.URLSyntaxException; - -/** - * Tests the V2 ACLs. The tests perform basic AMQP operations like creating queues or excahnges and publishing and consuming messages, using - * JMS to contact the broker. - */ -public class ExternalACLTest extends AbstractACLTestCase +public class ExternalACLTest extends SimpleACLTest { - public void testAccessAuthorizedSuccess() throws AMQException, URLSyntaxException, Exception - { - try - { - Connection conn = getConnection("test", "client", "guest"); - Session sess = conn.createSession(true, Session.SESSION_TRANSACTED); - conn.start(); - - //Do something to show connection is active. - sess.rollback(); - - conn.close(); - } - catch (Exception e) - { - fail("Connection was not created due to:" + e); - } - } - - public void testAccessVhostAuthorisedGuestSuccess() throws IOException, Exception - { - //The 'guest' user has no access to the 'test' vhost, as tested below in testAccessNoRights(), and so - //is unable to perform actions such as connecting (and by extension, creating a queue, and consuming - //from a queue etc). In order to test the vhost-wide 'access' ACL right, the 'guest' user has been given - //this right in the 'test2' vhost. - - try - { - //get a connection to the 'test2' vhost using the guest user and perform various actions. - Connection conn = getConnection("test2", "guest", "guest"); - Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); - conn.start(); - - //create Queues and consumers for each - Queue namedQueue = sess.createQueue("vhostAccessCreatedQueue" + getTestQueueName()); - Queue tempQueue = sess.createTemporaryQueue(); - MessageConsumer consumer = sess.createConsumer(namedQueue); - MessageConsumer tempConsumer = sess.createConsumer(tempQueue); - - //send a message to each queue (also causing an exchange declare) - MessageProducer sender = ((AMQSession<?, ?>) sess).createProducer(null); - ((org.apache.qpid.jms.MessageProducer) sender).send(namedQueue, sess.createTextMessage("test"), - DeliveryMode.NON_PERSISTENT, 0, 0L, false, false, true); - ((org.apache.qpid.jms.MessageProducer) sender).send(tempQueue, sess.createTextMessage("test"), - DeliveryMode.NON_PERSISTENT, 0, 0L, false, false, true); - - //consume the messages from the queues - consumer.receive(2000); - tempConsumer.receive(2000); - - conn.close(); - } - catch (Exception e) - { - fail("Test failed due to:" + e.getMessage()); - } - } - - public void testAccessNoRightsFailure() throws Exception - { - try - { - Connection conn = getConnection("test", "guest", "guest"); - Session sess = conn.createSession(true, Session.SESSION_TRANSACTED); - conn.start(); - sess.rollback(); - - fail("Connection was created."); - } - catch (JMSException e) - { - // JMSException -> linkedException -> cause = AMQException (403 or 320) - Exception linkedException = e.getLinkedException(); - assertNotNull("There was no linked exception", linkedException); - Throwable cause = linkedException.getCause(); - assertNotNull("Cause was null", cause); - assertTrue("Wrong linked exception type", cause instanceof AMQException); - AMQConstant errorCode = isBroker010() ? AMQConstant.CONTEXT_IN_USE : AMQConstant.ACCESS_REFUSED; - assertEquals("Incorrect error code received", errorCode, ((AMQException) cause).getErrorCode()); - } - } - - public void testClientDeleteQueueSuccess() throws Exception - { - try - { - Connection conn = getConnection("test", "client", "guest"); - Session sess = conn.createSession(true, Session.SESSION_TRANSACTED); - conn.start(); - - // create kipper - Topic kipper = sess.createTopic("kipper"); - TopicSubscriber subscriber = sess.createDurableSubscriber(kipper, "kipper"); - - subscriber.close(); - sess.unsubscribe("kipper"); - - //Do something to show connection is active. - sess.rollback(); - conn.close(); - } - catch (Exception e) - { - fail("Test failed due to:" + e.getMessage()); - } - } - - public void testServerDeleteQueueFailure() throws Exception - { - try - { - Connection conn = getConnection("test", "server", "guest"); - Session sess = conn.createSession(true, Session.SESSION_TRANSACTED); - conn.start(); - - // create kipper - Topic kipper = sess.createTopic("kipper"); - TopicSubscriber subscriber = sess.createDurableSubscriber(kipper, "kipper"); - - subscriber.close(); - sess.unsubscribe("kipper"); - - //Do something to show connection is active. - sess.rollback(); - conn.close(); - } - catch (JMSException e) - { - // JMSException -> linedException = AMQException.403 - check403Exception(e.getLinkedException()); - } - } - - public void testClientConsumeFromTempQueueSuccess() throws AMQException, URLSyntaxException, Exception - { - try - { - Connection conn = getConnection("test", "client", "guest"); - - Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); - - conn.start(); - - sess.createConsumer(sess.createTemporaryQueue()); - - conn.close(); - } - catch (Exception e) - { - fail("Test failed due to:" + e.getMessage()); - } - } - - public void testClientConsumeFromNamedQueueFailure() throws NamingException, Exception - { - try - { - Connection conn = getConnection("test", "client", "guest"); - - Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); - - conn.start(); - - sess.createConsumer(sess.createQueue("IllegalQueue")); - - fail("Test failed as consumer was created."); - } - catch (JMSException e) - { - check403Exception(e.getLinkedException()); - } - } - - public void testClientCreateTemporaryQueueSuccess() throws JMSException, URLSyntaxException, Exception - { - try - { - Connection conn = getConnection("test", "client", "guest"); - - Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); - - conn.start(); - - //Create Temporary Queue - can't use the createTempQueue as QueueName is null. - ((AMQSession<?, ?>) sess).createQueue(new AMQShortString("doesnt_matter_as_autodelete_means_tmp"), - true, false, false); - - conn.close(); - } - catch (Exception e) - { - fail("Test failed due to:" + e.getMessage()); - } - } - - public void testClientCreateNamedQueueFailure() throws NamingException, JMSException, AMQException, Exception - { - try - { - Connection conn = getConnection("test", "client", "guest"); - - Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); - - conn.start(); - - //Create a Named Queue - ((AMQSession<?, ?>) sess).createQueue(new AMQShortString("IllegalQueue"), false, false, false); - - fail("Test failed as Queue creation succeded."); - //conn will be automatically closed - } - catch (AMQException e) - { - check403Exception(e); - } - } - - public void testClientPublishUsingTransactionSuccess() throws AMQException, URLSyntaxException, Exception - { - try - { - Connection conn = getConnection("test", "client", "guest"); - - Session sess = conn.createSession(true, Session.SESSION_TRANSACTED); - - conn.start(); - - MessageProducer sender = sess.createProducer(sess.createQueue("example.RequestQueue")); - - sender.send(sess.createTextMessage("test")); - - //Send the message using a transaction as this will allow us to retrieve any errors that occur on the broker. - sess.commit(); - - conn.close(); - } - catch (Exception e) - { - fail("Test publish failed:" + e); - } - } - - public void testClientPublishValidQueueSuccess() throws AMQException, URLSyntaxException, Exception - { - try - { - Connection conn = getConnection("test", "client", "guest"); - - Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); - - conn.start(); - - MessageProducer sender = ((AMQSession<?, ?>) sess).createProducer(null); - - Queue queue = sess.createQueue("example.RequestQueue"); - - // Send a message that we will wait to be sent, this should give the broker time to process the msg - // before we finish this test. Message is set !immed !mand as the queue is invalid so want to test ACLs not - // queue existence. - ((org.apache.qpid.jms.MessageProducer) sender).send(queue, sess.createTextMessage("test"), - DeliveryMode.NON_PERSISTENT, 0, 0L, false, false, true); - - conn.close(); - } - catch (Exception e) - { - fail("Test publish failed:" + e); - } - } - - public void testClientPublishInvalidQueueSuccess() throws AMQException, URLSyntaxException, JMSException, NamingException, Exception - { - try - { - Connection conn = getConnection("test", "client", "guest"); - - Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); - - conn.start(); - - MessageProducer sender = ((AMQSession<?, ?>) session).createProducer(null); - - Queue queue = session.createQueue("Invalid"); - - // Send a message that we will wait to be sent, this should give the broker time to close the connection - // before we finish this test. Message is set !immed !mand as the queue is invalid so want to test ACLs not - // queue existence. - ((org.apache.qpid.jms.MessageProducer) sender).send(queue, session.createTextMessage("test"), - DeliveryMode.NON_PERSISTENT, 0, 0L, false, false, true); - - // Test the connection with a valid consumer - // This may fail as the session may be closed before the queue or the consumer created. - Queue temp = session.createTemporaryQueue(); - - session.createConsumer(temp).close(); - - //Connection should now be closed and will throw the exception caused by the above send - conn.close(); - - fail("Close is not expected to succeed."); - } - catch (IllegalStateException e) - { - _logger.info("QPID-2345: Session became closed and we got that error rather than the authentication error."); - } - catch (JMSException e) - { - check403Exception(e.getLinkedException()); - } - } - - public void testServerConsumeFromNamedQueueValid() throws AMQException, URLSyntaxException, Exception - { - try - { - Connection conn = getConnection("test", "server", "guest"); - - Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); - - conn.start(); - - sess.createConsumer(sess.createQueue("example.RequestQueue")); - - conn.close(); - } - catch (Exception e) - { - fail("Test failed due to:" + e.getMessage()); - } - } - - public void testServerConsumeFromNamedQueueInvalid() throws AMQException, URLSyntaxException, NamingException, Exception - { - try - { - Connection conn = getConnection("test", "client", "guest"); - - Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); - - conn.start(); - - sess.createConsumer(sess.createQueue("Invalid")); - - fail("Test failed as consumer was created."); - } - catch (JMSException e) - { - check403Exception(e.getLinkedException()); - } - } - - public void testServerConsumeFromTemporaryQueue() throws AMQException, URLSyntaxException, NamingException, Exception - { - try - { - Connection conn = getConnection("test", "server", "guest"); - - Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); - - conn.start(); - - sess.createConsumer(sess.createTemporaryQueue()); - - fail("Test failed as consumer was created."); - } - catch (JMSException e) - { - check403Exception(e.getLinkedException()); - } - } - - public void testServerCreateNamedQueueValid() throws JMSException, URLSyntaxException, Exception - { - try - { - Connection conn = getConnection("test", "server", "guest"); - - Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); - - conn.start(); - - //Create Temporary Queue - ((AMQSession<?, ?>) sess).createQueue(new AMQShortString("example.RequestQueue"), false, false, false); - - conn.close(); - } - catch (Exception e) - { - fail("Test failed due to:" + e.getMessage()); - } - } - - public void testServerCreateNamedQueueInvalid() throws JMSException, URLSyntaxException, AMQException, NamingException, Exception - { - try - { - Connection conn = getConnection("test", "server", "guest"); - - Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); - - conn.start(); - - //Create a Named Queue - ((AMQSession<?, ?>) sess).createQueue(new AMQShortString("IllegalQueue"), false, false, false); - - fail("Test failed as creation succeded."); - } - catch (Exception e) - { - check403Exception(e); - } - } - - public void testServerCreateTemporaryQueueInvalid() throws NamingException, Exception - { - try - { - Connection conn = getConnection("test", "server", "guest"); - Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); - - conn.start(); - - session.createTemporaryQueue(); - - fail("Test failed as creation succeded."); - } - catch (JMSException e) - { - check403Exception(e.getLinkedException()); - } - } - - public void testServerCreateAutoDeleteQueueInvalid() throws NamingException, JMSException, AMQException, Exception - { - try - { - Connection connection = getConnection("test", "server", "guest"); - - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - connection.start(); - - ((AMQSession<?, ?>) session).createQueue(new AMQShortString("again_ensure_auto_delete_queue_for_temporary"), - true, false, false); - - fail("Test failed as creation succeded."); - } - catch (Exception e) - { - check403Exception(e); - } - } - - /** - * This test uses both the cilent and sender to validate that the Server is able to publish to a temporary queue. - * The reason the client must be involved is that the Server is unable to create its own Temporary Queues. - * - * @throws AMQException - * @throws URLSyntaxException - * @throws JMSException - */ - public void testServerPublishUsingTransactionSuccess() throws AMQException, URLSyntaxException, JMSException, NamingException, Exception - { - //Set up the Server - Connection serverConnection = getConnection("test", "server", "guest"); - - Session serverSession = serverConnection.createSession(true, Session.SESSION_TRANSACTED); - - Queue requestQueue = serverSession.createQueue("example.RequestQueue"); - - MessageConsumer server = serverSession.createConsumer(requestQueue); - - serverConnection.start(); - - //Set up the consumer - Connection clientConnection = getConnection("test", "client", "guest"); - - //Send a test mesage - Session clientSession = clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - Queue responseQueue = clientSession.createTemporaryQueue(); - - MessageConsumer clientResponse = clientSession.createConsumer(responseQueue); - - clientConnection.start(); - - Message request = clientSession.createTextMessage("Request"); - - assertNotNull("Response Queue is null", responseQueue); - - request.setJMSReplyTo(responseQueue); - - clientSession.createProducer(requestQueue).send(request); - - try - { - Message msg = null; - - msg = server.receive(2000); - - while (msg != null && !((TextMessage) msg).getText().equals("Request")) - { - msg = server.receive(2000); - } - - assertNotNull("Message not received", msg); - - assertNotNull("Reply-To is Null", msg.getJMSReplyTo()); - - MessageProducer sender = serverSession.createProducer(msg.getJMSReplyTo()); - - sender.send(serverSession.createTextMessage("Response")); - - //Send the message using a transaction as this will allow us to retrieve any errors that occur on the broker. - serverSession.commit(); - - //Ensure Response is received. - Message clientResponseMsg = clientResponse.receive(2000); - assertNotNull("Client did not receive response message,", clientResponseMsg); - assertEquals("Incorrect message received", "Response", ((TextMessage) clientResponseMsg).getText()); - - } - catch (Exception e) - { - fail("Test publish failed:" + e); - } - finally - { - try - { - serverConnection.close(); - } - finally - { - clientConnection.close(); - } - } - } - - public void testServerPublishInvalidQueueSuccess() throws AMQException, URLSyntaxException, JMSException, NamingException, Exception - { - try - { - Connection conn = getConnection("test", "server", "guest"); - - ((AMQConnection) conn).setConnectionListener(this); - - Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); - - conn.start(); - - MessageProducer sender = ((AMQSession<?, ?>) session).createProducer(null); - - Queue queue = session.createQueue("Invalid"); - - // Send a message that we will wait to be sent, this should give the broker time to close the connection - // before we finish this test. Message is set !immed !mand as the queue is invalid so want to test ACLs not - // queue existence. - ((org.apache.qpid.jms.MessageProducer) sender).send(queue, session.createTextMessage("test"), - DeliveryMode.NON_PERSISTENT, 0, 0L, false, false, true); - - // Test the connection with a valid consumer - // This may not work as the session may be closed before the queue or consumer creation can occur. - // The correct JMSexception with linked error will only occur when the close method is recevied whilst in - // the failover safe block - session.createConsumer(session.createQueue("example.RequestQueue")).close(); - - //Connection should now be closed and will throw the exception caused by the above send - conn.close(); - - fail("Close is not expected to succeed."); - } - catch (IllegalStateException e) - { - _logger.info("QPID-2345: Session became closed and we got that error rather than the authentication error."); - } - catch (JMSException e) - { - check403Exception(e.getLinkedException()); - } - } - - @Override public String getConfig() { diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/security/acl/SimpleACLTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/security/acl/SimpleACLTest.java new file mode 100644 index 0000000000..a50817e659 --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/security/acl/SimpleACLTest.java @@ -0,0 +1,644 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server.security.acl; + +import java.io.IOException; + +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.IllegalStateException; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.Topic; +import javax.jms.TopicSubscriber; +import javax.naming.NamingException; + +import org.apache.qpid.AMQException; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.url.URLSyntaxException; + +/** + * Basic access control list tests. + * + * These tests require an access control security plugin to be configured in the broker, and carry out various broker + * operations that will succeed or fail depending on the user and virtual host. See the {@code config-systests-acl-setup.xml} + * configuration file for the SimpleXML version of the ACLs used by the Java broker only, or the various {@code .txt} + * files in the system tests directory for the external version 3 ACL files used by both the Java and C++ brokers. + * <p> + * This class can be extended and the {@link #getConfig()} method overridden to run the same tests with a different type + * of access control mechanism. Extension classes should differ only in the configuration file used, but extra tests can be + * added that are specific to a particular configuration. + * <p> + * The tests perform basic AMQP operations like creating queues or excahnges and publishing and consuming messages, using + * JMS to contact the broker. + * + * @see ExternalACLTest + */ +public class SimpleACLTest extends AbstractACLTestCase +{ + public void testAccessAuthorizedSuccess() throws AMQException, URLSyntaxException, Exception + { + try + { + Connection conn = getConnection("test", "client", "guest"); + Session sess = conn.createSession(true, Session.SESSION_TRANSACTED); + conn.start(); + + //Do something to show connection is active. + sess.rollback(); + + conn.close(); + } + catch (Exception e) + { + fail("Connection was not created due to:" + e); + } + } + + public void testAccessVhostAuthorisedGuestSuccess() throws IOException, Exception + { + //The 'guest' user has no access to the 'test' vhost, as tested below in testAccessNoRights(), and so + //is unable to perform actions such as connecting (and by extension, creating a queue, and consuming + //from a queue etc). In order to test the vhost-wide 'access' ACL right, the 'guest' user has been given + //this right in the 'test2' vhost. + + try + { + //get a connection to the 'test2' vhost using the guest user and perform various actions. + Connection conn = getConnection("test2", "guest", "guest"); + Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + conn.start(); + + //create Queues and consumers for each + Queue namedQueue = sess.createQueue("vhostAccessCreatedQueue" + getTestQueueName()); + Queue tempQueue = sess.createTemporaryQueue(); + MessageConsumer consumer = sess.createConsumer(namedQueue); + MessageConsumer tempConsumer = sess.createConsumer(tempQueue); + + //send a message to each queue (also causing an exchange declare) + MessageProducer sender = ((AMQSession<?, ?>) sess).createProducer(null); + ((org.apache.qpid.jms.MessageProducer) sender).send(namedQueue, sess.createTextMessage("test"), + DeliveryMode.NON_PERSISTENT, 0, 0L, false, false, true); + ((org.apache.qpid.jms.MessageProducer) sender).send(tempQueue, sess.createTextMessage("test"), + DeliveryMode.NON_PERSISTENT, 0, 0L, false, false, true); + + //consume the messages from the queues + consumer.receive(2000); + tempConsumer.receive(2000); + + conn.close(); + } + catch (Exception e) + { + fail("Test failed due to:" + e.getMessage()); + } + } + + public void testAccessNoRightsFailure() throws Exception + { + try + { + Connection conn = getConnection("test", "guest", "guest"); + Session sess = conn.createSession(true, Session.SESSION_TRANSACTED); + conn.start(); + sess.rollback(); + + fail("Connection was created."); + } + catch (JMSException e) + { + // JMSException -> linkedException -> cause = AMQException (403 or 320) + Exception linkedException = e.getLinkedException(); + assertNotNull("There was no linked exception", linkedException); + Throwable cause = linkedException.getCause(); + assertNotNull("Cause was null", cause); + assertTrue("Wrong linked exception type", cause instanceof AMQException); + AMQConstant errorCode = isBroker010() ? AMQConstant.CONTEXT_IN_USE : AMQConstant.ACCESS_REFUSED; + assertEquals("Incorrect error code received", errorCode, ((AMQException) cause).getErrorCode()); + } + } + + public void testClientDeleteQueueSuccess() throws Exception + { + try + { + Connection conn = getConnection("test", "client", "guest"); + Session sess = conn.createSession(true, Session.SESSION_TRANSACTED); + conn.start(); + + // create kipper + Topic kipper = sess.createTopic("kipper"); + TopicSubscriber subscriber = sess.createDurableSubscriber(kipper, "kipper"); + + subscriber.close(); + sess.unsubscribe("kipper"); + + //Do something to show connection is active. + sess.rollback(); + conn.close(); + } + catch (Exception e) + { + fail("Test failed due to:" + e.getMessage()); + } + } + + public void testServerDeleteQueueFailure() throws Exception + { + try + { + Connection conn = getConnection("test", "server", "guest"); + Session sess = conn.createSession(true, Session.SESSION_TRANSACTED); + conn.start(); + + // create kipper + Topic kipper = sess.createTopic("kipper"); + TopicSubscriber subscriber = sess.createDurableSubscriber(kipper, "kipper"); + + subscriber.close(); + sess.unsubscribe("kipper"); + + //Do something to show connection is active. + sess.rollback(); + conn.close(); + } + catch (JMSException e) + { + // JMSException -> linedException = AMQException.403 + check403Exception(e.getLinkedException()); + } + } + + public void testClientConsumeFromTempQueueSuccess() throws AMQException, URLSyntaxException, Exception + { + try + { + Connection conn = getConnection("test", "client", "guest"); + + Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + + conn.start(); + + sess.createConsumer(sess.createTemporaryQueue()); + + conn.close(); + } + catch (Exception e) + { + fail("Test failed due to:" + e.getMessage()); + } + } + + public void testClientConsumeFromNamedQueueFailure() throws NamingException, Exception + { + try + { + Connection conn = getConnection("test", "client", "guest"); + + Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + + conn.start(); + + sess.createConsumer(sess.createQueue("IllegalQueue")); + + fail("Test failed as consumer was created."); + } + catch (JMSException e) + { + check403Exception(e.getLinkedException()); + } + } + + public void testClientCreateTemporaryQueueSuccess() throws JMSException, URLSyntaxException, Exception + { + try + { + Connection conn = getConnection("test", "client", "guest"); + + Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + + conn.start(); + + //Create Temporary Queue - can't use the createTempQueue as QueueName is null. + ((AMQSession<?, ?>) sess).createQueue(new AMQShortString("doesnt_matter_as_autodelete_means_tmp"), + true, false, false); + + conn.close(); + } + catch (Exception e) + { + fail("Test failed due to:" + e.getMessage()); + } + } + + public void testClientCreateNamedQueueFailure() throws NamingException, JMSException, AMQException, Exception + { + try + { + Connection conn = getConnection("test", "client", "guest"); + + Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + + conn.start(); + + //Create a Named Queue + ((AMQSession<?, ?>) sess).createQueue(new AMQShortString("IllegalQueue"), false, false, false); + + fail("Test failed as Queue creation succeded."); + //conn will be automatically closed + } + catch (AMQException e) + { + check403Exception(e); + } + } + + public void testClientPublishUsingTransactionSuccess() throws AMQException, URLSyntaxException, Exception + { + try + { + Connection conn = getConnection("test", "client", "guest"); + + Session sess = conn.createSession(true, Session.SESSION_TRANSACTED); + + conn.start(); + + MessageProducer sender = sess.createProducer(sess.createQueue("example.RequestQueue")); + + sender.send(sess.createTextMessage("test")); + + //Send the message using a transaction as this will allow us to retrieve any errors that occur on the broker. + sess.commit(); + + conn.close(); + } + catch (Exception e) + { + fail("Test publish failed:" + e); + } + } + + public void testClientPublishValidQueueSuccess() throws AMQException, URLSyntaxException, Exception + { + try + { + Connection conn = getConnection("test", "client", "guest"); + + Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + + conn.start(); + + MessageProducer sender = ((AMQSession<?, ?>) sess).createProducer(null); + + Queue queue = sess.createQueue("example.RequestQueue"); + + // Send a message that we will wait to be sent, this should give the broker time to process the msg + // before we finish this test. Message is set !immed !mand as the queue is invalid so want to test ACLs not + // queue existence. + ((org.apache.qpid.jms.MessageProducer) sender).send(queue, sess.createTextMessage("test"), + DeliveryMode.NON_PERSISTENT, 0, 0L, false, false, true); + + conn.close(); + } + catch (Exception e) + { + fail("Test publish failed:" + e); + } + } + + public void testClientPublishInvalidQueueSuccess() throws AMQException, URLSyntaxException, JMSException, NamingException, Exception + { + try + { + Connection conn = getConnection("test", "client", "guest"); + + Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + + conn.start(); + + MessageProducer sender = ((AMQSession<?, ?>) session).createProducer(null); + + Queue queue = session.createQueue("Invalid"); + + // Send a message that we will wait to be sent, this should give the broker time to close the connection + // before we finish this test. Message is set !immed !mand as the queue is invalid so want to test ACLs not + // queue existence. + ((org.apache.qpid.jms.MessageProducer) sender).send(queue, session.createTextMessage("test"), + DeliveryMode.NON_PERSISTENT, 0, 0L, false, false, true); + + // Test the connection with a valid consumer + // This may fail as the session may be closed before the queue or the consumer created. + Queue temp = session.createTemporaryQueue(); + + session.createConsumer(temp).close(); + + //Connection should now be closed and will throw the exception caused by the above send + conn.close(); + + fail("Close is not expected to succeed."); + } + catch (IllegalStateException e) + { + _logger.info("QPID-2345: Session became closed and we got that error rather than the authentication error."); + } + catch (JMSException e) + { + check403Exception(e.getLinkedException()); + } + } + + public void testServerConsumeFromNamedQueueValid() throws AMQException, URLSyntaxException, Exception + { + try + { + Connection conn = getConnection("test", "server", "guest"); + + Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + + conn.start(); + + sess.createConsumer(sess.createQueue("example.RequestQueue")); + + conn.close(); + } + catch (Exception e) + { + fail("Test failed due to:" + e.getMessage()); + } + } + + public void testServerConsumeFromNamedQueueInvalid() throws AMQException, URLSyntaxException, NamingException, Exception + { + try + { + Connection conn = getConnection("test", "client", "guest"); + + Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + + conn.start(); + + sess.createConsumer(sess.createQueue("Invalid")); + + fail("Test failed as consumer was created."); + } + catch (JMSException e) + { + check403Exception(e.getLinkedException()); + } + } + + public void testServerConsumeFromTemporaryQueue() throws AMQException, URLSyntaxException, NamingException, Exception + { + try + { + Connection conn = getConnection("test", "server", "guest"); + + Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + + conn.start(); + + sess.createConsumer(sess.createTemporaryQueue()); + + fail("Test failed as consumer was created."); + } + catch (JMSException e) + { + check403Exception(e.getLinkedException()); + } + } + + public void testServerCreateNamedQueueValid() throws JMSException, URLSyntaxException, Exception + { + try + { + Connection conn = getConnection("test", "server", "guest"); + + Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + + conn.start(); + + //Create Temporary Queue + ((AMQSession<?, ?>) sess).createQueue(new AMQShortString("example.RequestQueue"), false, false, false); + + conn.close(); + } + catch (Exception e) + { + fail("Test failed due to:" + e.getMessage()); + } + } + + public void testServerCreateNamedQueueInvalid() throws JMSException, URLSyntaxException, AMQException, NamingException, Exception + { + try + { + Connection conn = getConnection("test", "server", "guest"); + + Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + + conn.start(); + + //Create a Named Queue + ((AMQSession<?, ?>) sess).createQueue(new AMQShortString("IllegalQueue"), false, false, false); + + fail("Test failed as creation succeded."); + } + catch (Exception e) + { + check403Exception(e); + } + } + + public void testServerCreateTemporaryQueueInvalid() throws NamingException, Exception + { + try + { + Connection conn = getConnection("test", "server", "guest"); + Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + + conn.start(); + + session.createTemporaryQueue(); + + fail("Test failed as creation succeded."); + } + catch (JMSException e) + { + check403Exception(e.getLinkedException()); + } + } + + public void testServerCreateAutoDeleteQueueInvalid() throws NamingException, JMSException, AMQException, Exception + { + try + { + Connection connection = getConnection("test", "server", "guest"); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + connection.start(); + + ((AMQSession<?, ?>) session).createQueue(new AMQShortString("again_ensure_auto_delete_queue_for_temporary"), + true, false, false); + + fail("Test failed as creation succeded."); + } + catch (Exception e) + { + check403Exception(e); + } + } + + /** + * This test uses both the cilent and sender to validate that the Server is able to publish to a temporary queue. + * The reason the client must be involved is that the Server is unable to create its own Temporary Queues. + * + * @throws AMQException + * @throws URLSyntaxException + * @throws JMSException + */ + public void testServerPublishUsingTransactionSuccess() throws AMQException, URLSyntaxException, JMSException, NamingException, Exception + { + //Set up the Server + Connection serverConnection = getConnection("test", "server", "guest"); + + Session serverSession = serverConnection.createSession(true, Session.SESSION_TRANSACTED); + + Queue requestQueue = serverSession.createQueue("example.RequestQueue"); + + MessageConsumer server = serverSession.createConsumer(requestQueue); + + serverConnection.start(); + + //Set up the consumer + Connection clientConnection = getConnection("test", "client", "guest"); + + //Send a test mesage + Session clientSession = clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + Queue responseQueue = clientSession.createTemporaryQueue(); + + MessageConsumer clientResponse = clientSession.createConsumer(responseQueue); + + clientConnection.start(); + + Message request = clientSession.createTextMessage("Request"); + + assertNotNull("Response Queue is null", responseQueue); + + request.setJMSReplyTo(responseQueue); + + clientSession.createProducer(requestQueue).send(request); + + try + { + Message msg = null; + + msg = server.receive(2000); + + while (msg != null && !((TextMessage) msg).getText().equals("Request")) + { + msg = server.receive(2000); + } + + assertNotNull("Message not received", msg); + + assertNotNull("Reply-To is Null", msg.getJMSReplyTo()); + + MessageProducer sender = serverSession.createProducer(msg.getJMSReplyTo()); + + sender.send(serverSession.createTextMessage("Response")); + + //Send the message using a transaction as this will allow us to retrieve any errors that occur on the broker. + serverSession.commit(); + + //Ensure Response is received. + Message clientResponseMsg = clientResponse.receive(2000); + assertNotNull("Client did not receive response message,", clientResponseMsg); + assertEquals("Incorrect message received", "Response", ((TextMessage) clientResponseMsg).getText()); + + } + catch (Exception e) + { + fail("Test publish failed:" + e); + } + finally + { + try + { + serverConnection.close(); + } + finally + { + clientConnection.close(); + } + } + } + + public void testServerPublishInvalidQueueSuccess() throws AMQException, URLSyntaxException, JMSException, NamingException, Exception + { + try + { + Connection conn = getConnection("test", "server", "guest"); + + ((AMQConnection) conn).setConnectionListener(this); + + Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + + conn.start(); + + MessageProducer sender = ((AMQSession<?, ?>) session).createProducer(null); + + Queue queue = session.createQueue("Invalid"); + + // Send a message that we will wait to be sent, this should give the broker time to close the connection + // before we finish this test. Message is set !immed !mand as the queue is invalid so want to test ACLs not + // queue existence. + ((org.apache.qpid.jms.MessageProducer) sender).send(queue, session.createTextMessage("test"), + DeliveryMode.NON_PERSISTENT, 0, 0L, false, false, true); + + // Test the connection with a valid consumer + // This may not work as the session may be closed before the queue or consumer creation can occur. + // The correct JMSexception with linked error will only occur when the close method is recevied whilst in + // the failover safe block + session.createConsumer(session.createQueue("example.RequestQueue")).close(); + + //Connection should now be closed and will throw the exception caused by the above send + conn.close(); + + fail("Close is not expected to succeed."); + } + catch (IllegalStateException e) + { + _logger.info("QPID-2345: Session became closed and we got that error rather than the authentication error."); + } + catch (JMSException e) + { + check403Exception(e.getLinkedException()); + } + } +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/security/firewall/FirewallConfigTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/security/firewall/FirewallConfigTest.java index 2d99a44532..f40e95885d 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/security/firewall/FirewallConfigTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/security/firewall/FirewallConfigTest.java @@ -85,6 +85,12 @@ public class FirewallConfigTest extends QpidBrokerTestCase public void testVhostAllowBrokerDeny() throws Exception { + if (_broker.equals(VM)) + { + //No point running this test with an InVM broker as the + //firewall plugin only functions for TCP connections. + return; + } _configFile = new File(System.getProperty("QPID_HOME"), "etc/config-systests-firewall-2.xml"); @@ -119,6 +125,13 @@ public class FirewallConfigTest extends QpidBrokerTestCase public void testVhostDenyBrokerAllow() throws Exception { + if (_broker.equals(VM)) + { + //No point running this test with an InVM broker as the + //firewall plugin only functions for TCP connections. + return; + } + _configFile = new File(System.getProperty("QPID_HOME"), "etc/config-systests-firewall-3.xml"); super.setUp(); @@ -264,6 +277,11 @@ public class FirewallConfigTest extends QpidBrokerTestCase private void testFirewall(boolean initial, boolean inVhost, Runnable restartOrReload) throws Exception { + if (_broker.equals(VM)) + { + // No point running this test in a vm broker + return; + } writeFirewallFile(initial, inVhost); setConfigurationProperty("management.enabled", String.valueOf(true)); diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java index b5bb74327e..51589c705f 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java @@ -25,37 +25,25 @@ import java.util.Collections; import java.util.HashMap; import java.util.Hashtable; import java.util.Map; -import java.util.Properties; - import javax.jms.Connection; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; -import javax.jms.Queue; -import javax.jms.QueueReceiver; -import javax.jms.QueueSession; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; -import javax.jms.TopicSession; -import javax.jms.TopicSubscriber; import javax.naming.Context; -import javax.naming.InitialContext; import org.apache.qpid.client.AMQAnyDestination; -import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQDestination; -import org.apache.qpid.client.AMQSession; import org.apache.qpid.client.AMQSession_0_10; -import org.apache.qpid.client.message.QpidMessageProperties; import org.apache.qpid.client.messaging.address.Node.ExchangeNode; import org.apache.qpid.client.messaging.address.Node.QueueNode; import org.apache.qpid.jndi.PropertiesFileInitialContextFactory; import org.apache.qpid.messaging.Address; import org.apache.qpid.test.utils.QpidBrokerTestCase; -import org.apache.qpid.transport.ExecutionErrorCode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -199,7 +187,9 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase dest.getAddressName(),dest.getAddressName(), dest.getSourceNode().getDeclareArgs())); } - + + // todo add tests for delete options + public void testCreateQueue() throws Exception { Session jmsSession = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); @@ -212,7 +202,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase "durable: true ," + "x-declare: " + "{" + - "exclusive: true," + + "auto-delete: true," + "arguments: {" + "'qpid.max_size': 1000," + "'qpid.max_count': 100" + @@ -228,9 +218,6 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase "}"; AMQDestination dest = new AMQAnyDestination(addr); MessageConsumer cons = jmsSession.createConsumer(dest); - cons.close(); - - // Even if the consumer is closed the queue and the bindings should be intact. assertTrue("Queue not created as expected",( (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true)); @@ -259,44 +246,12 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase (AMQSession_0_10)jmsSession).isQueueBound("amq.match", dest.getAddressName(),null, args)); - MessageProducer prod = jmsSession.createProducer(dest); - prod.send(jmsSession.createTextMessage("test")); - - MessageConsumer cons2 = jmsSession.createConsumer(jmsSession.createQueue("ADDR:my-queue")); - Message m = cons2.receive(1000); - assertNotNull("Should receive message sent to my-queue",m); - assertEquals("The subject set in the message is incorrect","hello",m.getStringProperty(QpidMessageProperties.QPID_SUBJECT)); } public void testCreateExchange() throws Exception { - createExchangeImpl(false, false); - } - - /** - * Verify creating an exchange via an Address, with supported - * exchange-declare arguments. - */ - public void testCreateExchangeWithArgs() throws Exception - { - createExchangeImpl(true, false); - } - - /** - * Verify that when creating an exchange via an Address, if a - * nonsense argument is specified the broker throws an execution - * exception back on the session with NOT_IMPLEMENTED status. - */ - public void testCreateExchangeWithNonsenseArgs() throws Exception - { - createExchangeImpl(true, true); - } - - private void createExchangeImpl(final boolean withExchangeArgs, - final boolean useNonsenseArguments) throws Exception - { Session jmsSession = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); - + String addr = "ADDR:my-exchange/hello; " + "{ " + "create: always, " + @@ -306,36 +261,17 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase "x-declare: " + "{ " + "type:direct, " + - "auto-delete: true" + - createExchangeArgsString(withExchangeArgs, useNonsenseArguments) + + "auto-delete: true, " + + "arguments: {" + + "'qpid.msg_sequence': 1, " + + "'qpid.ive': 1" + + "}" + "}" + "}" + "}"; AMQDestination dest = new AMQAnyDestination(addr); - - MessageConsumer cons; - try - { - cons = jmsSession.createConsumer(dest); - if(useNonsenseArguments) - { - fail("Expected execution exception during exchange declare did not occur"); - } - } - catch(JMSException e) - { - if(useNonsenseArguments && e.getCause().getMessage().contains(ExecutionErrorCode.NOT_IMPLEMENTED.toString())) - { - //expected because we used an argument which the broker doesn't have functionality - //for. We can't do the rest of the test as a result of the exception, just stop. - return; - } - else - { - fail("Unexpected exception whilst creating consumer: " + e); - } - } + MessageConsumer cons = jmsSession.createConsumer(dest); assertTrue("Exchange not created as expected",( (AMQSession_0_10)jmsSession).isExchangeExist(dest, (ExchangeNode)dest.getTargetNode() , true)); @@ -350,66 +286,16 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase cons = jmsSession.createConsumer(dest); } - private String createExchangeArgsString(final boolean withExchangeArgs, - final boolean useNonsenseArguments) - { - String argsString; - - if(withExchangeArgs && useNonsenseArguments) - { - argsString = ", arguments: {" + - "'abcd.1234.wxyz': 1, " + - "}"; - } - else if(withExchangeArgs) - { - argsString = ", arguments: {" + - "'qpid.msg_sequence': 1, " + - "'qpid.ive': 1" + - "}"; - } - else - { - argsString = ""; - } - - return argsString; - } - - public void checkQueueForBindings(Session jmsSession, AMQDestination dest,String headersBinding) throws Exception - { - assertTrue("Queue not created as expected",( - (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true)); - - assertTrue("Queue not bound as expected",( - (AMQSession_0_10)jmsSession).isQueueBound("", - dest.getAddressName(),dest.getAddressName(), null)); - - assertTrue("Queue not bound as expected",( - (AMQSession_0_10)jmsSession).isQueueBound("amq.direct", - dest.getAddressName(),"test", null)); - - assertTrue("Queue not bound as expected",( - (AMQSession_0_10)jmsSession).isQueueBound("amq.topic", - dest.getAddressName(),"a.#", null)); - - Address a = Address.parse(headersBinding); - assertTrue("Queue not bound as expected",( - (AMQSession_0_10)jmsSession).isQueueBound("amq.match", - dest.getAddressName(),null, a.getOptions())); - } - - /** - * Test goal: Verifies that a producer and consumer creation triggers the correct - * behavior for x-bindings specified in node props. - */ public void testBindQueueWithArgs() throws Exception { + Session jmsSession = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); - Session jmsSession = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); String headersBinding = "{exchange: 'amq.match', arguments: {x-match: any, dep: sales, loc: CA}}"; - String addr = "node: " + + String addr = "ADDR:my-queue/hello; " + + "{ " + + "create: always, " + + "node: " + "{" + "durable: true ," + "x-declare: " + @@ -424,14 +310,28 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase "}" + "}"; + AMQDestination dest = new AMQAnyDestination(addr); + MessageConsumer cons = jmsSession.createConsumer(dest); + + assertTrue("Queue not created as expected",( + (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true)); + + assertTrue("Queue not bound as expected",( + (AMQSession_0_10)jmsSession).isQueueBound("", + dest.getAddressName(),dest.getAddressName(), null)); - AMQDestination dest1 = new AMQAnyDestination("ADDR:my-queue/hello; {create: receiver, " +addr); - MessageConsumer cons = jmsSession.createConsumer(dest1); - checkQueueForBindings(jmsSession,dest1,headersBinding); + assertTrue("Queue not bound as expected",( + (AMQSession_0_10)jmsSession).isQueueBound("amq.direct", + dest.getAddressName(),"test", null)); + + assertTrue("Queue not bound as expected",( + (AMQSession_0_10)jmsSession).isQueueBound("amq.topic", + dest.getAddressName(),"a.#", null)); - AMQDestination dest2 = new AMQAnyDestination("ADDR:my-queue2/hello; {create: sender, " +addr); - MessageProducer prod = jmsSession.createProducer(dest2); - checkQueueForBindings(jmsSession,dest2,headersBinding); + Address a = Address.parse(headersBinding); + assertTrue("Queue not bound as expected",( + (AMQSession_0_10)jmsSession).isQueueBound("amq.match", + dest.getAddressName(),null, a.getOptions())); } /** @@ -567,6 +467,39 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase } /** + * Test goal: Verifies that and address based destination can be used successfully + * as a reply to. + */ + public void testAddressBasedReplyTo() throws Exception + { + Session jmsSession = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); + + String addr = "ADDR:amq.direct/x512; {create: receiver, " + + "link : {name : 'MY.RESP.QUEUE', " + + "x-declare : { auto-delete: true, exclusive: true, " + + "arguments : {'qpid.max_size': 1000, 'qpid.policy_type': ring }} } }"; + + Destination replyTo = new AMQAnyDestination(addr); + Destination dest =new AMQAnyDestination("ADDR:amq.direct/Hello"); + + MessageConsumer cons = jmsSession.createConsumer(dest); + MessageProducer prod = jmsSession.createProducer(dest); + Message m = jmsSession.createTextMessage("Hello"); + m.setJMSReplyTo(replyTo); + prod.send(m); + + Message msg = cons.receive(1000); + assertNotNull("consumer should have received the message",msg); + + MessageConsumer replyToCons = jmsSession.createConsumer(replyTo); + MessageProducer replyToProd = jmsSession.createProducer(msg.getJMSReplyTo()); + replyToProd.send(jmsSession.createTextMessage("reply")); + + Message replyToMsg = replyToCons.receive(1000); + assertNotNull("The reply to consumer should have got the message",replyToMsg); + } + + /** * Test goal: Verifies that session.createQueue method * works as expected both with the new and old addressing scheme. */ @@ -587,22 +520,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase cons.close(); // Using the ADDR method - // default case queue = ssn.createQueue("ADDR:my-queue2"); - try - { - prod = ssn.createProducer(queue); - fail("The client should throw an exception, since there is no queue present in the broker"); - } - catch(Exception e) - { - String s = "The name 'my-queue2' supplied in the address " + - "doesn't resolve to an exchange or a queue"; - assertEquals(s,e.getCause().getCause().getMessage()); - } - - // explicit create case - queue = ssn.createQueue("ADDR:my-queue2; {create: sender}"); prod = ssn.createProducer(queue); cons = ssn.createConsumer(queue); assertTrue("my-queue2 was not created as expected",( @@ -629,25 +547,11 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase } /** - * Test goal: Verifies that session.creatTopic method works as expected - * both with the new and old addressing scheme. + * Test goal: Verifies that session.creatTopic method + * works as expected both with the new and old addressing scheme. */ public void testSessionCreateTopic() throws Exception { - sessionCreateTopicImpl(false); - } - - /** - * Test goal: Verifies that session.creatTopic method works as expected - * both with the new and old addressing scheme when adding exchange arguments. - */ - public void testSessionCreateTopicWithExchangeArgs() throws Exception - { - sessionCreateTopicImpl(true); - } - - private void sessionCreateTopicImpl(boolean withExchangeArgs) throws Exception - { Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); // Using the BURL method @@ -667,7 +571,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase prod.send(ssn.createTextMessage("test")); assertNotNull("consumer should receive a message",cons.receive(1000)); cons.close(); - + String addr = "ADDR:vehicles/bus; " + "{ " + "create: always, " + @@ -677,8 +581,11 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase "x-declare: " + "{ " + "type:direct, " + - "auto-delete: true" + - createExchangeArgsString(withExchangeArgs, false) + + "auto-delete: true, " + + "arguments: {" + + "'qpid.msg_sequence': 1, " + + "'qpid.ive': 1" + + "}" + "}" + "}, " + "link: {name : my-topic, " + @@ -790,7 +697,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase public void testSubscriptionForSameDestination() throws Exception { Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); - Destination dest = ssn.createTopic("ADDR:amq.topic/foo; {link:{durable:true}}"); + Destination dest = ssn.createTopic("ADDR:amq.topic/foo"); MessageConsumer consumer1 = ssn.createConsumer(dest); MessageConsumer consumer2 = ssn.createConsumer(dest); MessageProducer prod = ssn.createProducer(dest); @@ -889,297 +796,4 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase { } } - - public void testQueueReceiversAndTopicSubscriber() throws Exception - { - Queue queue = new AMQAnyDestination("ADDR:my-queue; {create: always}"); - Topic topic = new AMQAnyDestination("ADDR:amq.topic/test"); - - QueueSession qSession = ((AMQConnection)_connection).createQueueSession(false, Session.AUTO_ACKNOWLEDGE); - QueueReceiver receiver = qSession.createReceiver(queue); - - TopicSession tSession = ((AMQConnection)_connection).createTopicSession(false, Session.AUTO_ACKNOWLEDGE); - TopicSubscriber sub = tSession.createSubscriber(topic); - - Session ssn = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageProducer prod1 = ssn.createProducer(ssn.createQueue("ADDR:my-queue")); - prod1.send(ssn.createTextMessage("test1")); - - MessageProducer prod2 = ssn.createProducer(ssn.createTopic("ADDR:amq.topic/test")); - prod2.send(ssn.createTextMessage("test2")); - - Message msg1 = receiver.receive(); - assertNotNull(msg1); - assertEquals("test1",((TextMessage)msg1).getText()); - - Message msg2 = sub.receive(); - assertNotNull(msg2); - assertEquals("test2",((TextMessage)msg2).getText()); - } - - public void testDurableSubscriber() throws Exception - { - Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); - - Properties props = new Properties(); - props.setProperty("java.naming.factory.initial", "org.apache.qpid.jndi.PropertiesFileInitialContextFactory"); - props.setProperty("destination.address1", "ADDR:amq.topic"); - props.setProperty("destination.address2", "ADDR:amq.direct/test"); - String addrStr = "ADDR:amq.topic/test; {link:{name: my-topic," + - "x-bindings:[{key:'NYSE.#'},{key:'NASDAQ.#'},{key:'CNTL.#'}]}}"; - props.setProperty("destination.address3", addrStr); - props.setProperty("topic.address4", "hello.world"); - addrStr = "ADDR:my_queue; {create:always,link: {x-subscribes:{exclusive: true, arguments: {a:b,x:y}}}}"; - props.setProperty("destination.address5", addrStr); - - Context ctx = new InitialContext(props); - - for (int i=1; i < 5; i++) - { - Topic topic = (Topic) ctx.lookup("address"+i); - createDurableSubscriber(ctx,ssn,"address"+i,topic); - } - - Topic topic = ssn.createTopic("ADDR:news.us"); - createDurableSubscriber(ctx,ssn,"my-dest",topic); - - Topic namedQueue = (Topic) ctx.lookup("address5"); - try - { - createDurableSubscriber(ctx,ssn,"my-queue",namedQueue); - fail("Exception should be thrown. Durable subscribers cannot be created for Queues"); - } - catch(JMSException e) - { - assertEquals("Durable subscribers can only be created for Topics", - e.getMessage()); - } - } - - private void createDurableSubscriber(Context ctx,Session ssn,String destName,Topic topic) throws Exception - { - MessageConsumer cons = ssn.createDurableSubscriber(topic, destName); - MessageProducer prod = ssn.createProducer(topic); - - Message m = ssn.createTextMessage(destName); - prod.send(m); - Message msg = cons.receive(1000); - assertNotNull(msg); - assertEquals(destName,((TextMessage)msg).getText()); - ssn.unsubscribe(destName); - } - - public void testDeleteOptions() throws Exception - { - Session jmsSession = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); - MessageConsumer cons; - - // default (create never, assert never) ------------------- - // create never -------------------------------------------- - String addr1 = "ADDR:testQueue1;{create: always, delete: always}"; - AMQDestination dest = new AMQAnyDestination(addr1); - try - { - cons = jmsSession.createConsumer(dest); - cons.close(); - } - catch(JMSException e) - { - fail("Exception should not be thrown. Exception thrown is : " + e); - } - - assertFalse("Queue not deleted as expected",( - (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true)); - - - String addr2 = "ADDR:testQueue2;{create: always, delete: receiver}"; - dest = new AMQAnyDestination(addr2); - try - { - cons = jmsSession.createConsumer(dest); - cons.close(); - } - catch(JMSException e) - { - fail("Exception should not be thrown. Exception thrown is : " + e); - } - - assertFalse("Queue not deleted as expected",( - (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true)); - - - String addr3 = "ADDR:testQueue3;{create: always, delete: sender}"; - dest = new AMQAnyDestination(addr3); - try - { - cons = jmsSession.createConsumer(dest); - MessageProducer prod = jmsSession.createProducer(dest); - prod.close(); - } - catch(JMSException e) - { - fail("Exception should not be thrown. Exception thrown is : " + e); - } - - assertFalse("Queue not deleted as expected",( - (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true)); - - - } - - /** - * Test Goals : 1. Test if the client sets the correct accept mode for unreliable - * and at-least-once. - * 2. Test default reliability modes for Queues and Topics. - * 3. Test if an exception is thrown if exactly-once is used. - * 4. Test if an exception is thrown if at-least-once is used with topics. - * - * Test Strategy: For goal #1 & #2 - * For unreliable and at-least-once the test tries to receives messages - * in client_ack mode but does not ack the messages. - * It will then close the session, recreate a new session - * and will then try to verify the queue depth. - * For unreliable the messages should have been taken off the queue. - * For at-least-once the messages should be put back onto the queue. - * - */ - - public void testReliabilityOptions() throws Exception - { - String addr1 = "ADDR:testQueue1;{create: always, delete : receiver, link : {reliability : unreliable}}"; - acceptModeTest(addr1,0); - - String addr2 = "ADDR:testQueue2;{create: always, delete : receiver, link : {reliability : at-least-once}}"; - acceptModeTest(addr2,2); - - // Default accept-mode for topics - acceptModeTest("ADDR:amq.topic/test",0); - - // Default accept-mode for queues - acceptModeTest("ADDR:testQueue1;{create: always}",2); - - String addr3 = "ADDR:testQueue2;{create: always, delete : receiver, link : {reliability : exactly-once}}"; - try - { - AMQAnyDestination dest = new AMQAnyDestination(addr3); - fail("An exception should be thrown indicating it's an unsupported type"); - } - catch(Exception e) - { - assertTrue(e.getCause().getMessage().contains("The reliability mode 'exactly-once' is not yet supported")); - } - - String addr4 = "ADDR:amq.topic/test;{link : {reliability : at-least-once}}"; - try - { - AMQAnyDestination dest = new AMQAnyDestination(addr4); - Session ssn = _connection.createSession(false,Session.CLIENT_ACKNOWLEDGE); - MessageConsumer cons = ssn.createConsumer(dest); - fail("An exception should be thrown indicating it's an unsupported combination"); - } - catch(Exception e) - { - assertTrue(e.getCause().getMessage().contains("AT-LEAST-ONCE is not yet supported for Topics")); - } - } - - private void acceptModeTest(String address, int expectedQueueDepth) throws Exception - { - Session ssn = _connection.createSession(false,Session.CLIENT_ACKNOWLEDGE); - MessageConsumer cons; - MessageProducer prod; - - AMQDestination dest = new AMQAnyDestination(address); - cons = ssn.createConsumer(dest); - prod = ssn.createProducer(dest); - - for (int i=0; i < expectedQueueDepth; i++) - { - prod.send(ssn.createTextMessage("Msg" + i)); - } - - for (int i=0; i < expectedQueueDepth; i++) - { - Message msg = cons.receive(1000); - assertNotNull(msg); - assertEquals("Msg" + i,((TextMessage)msg).getText()); - } - - ssn.close(); - ssn = _connection.createSession(false,Session.CLIENT_ACKNOWLEDGE); - long queueDepth = ((AMQSession) ssn).getQueueDepth(dest); - assertEquals(expectedQueueDepth,queueDepth); - cons.close(); - prod.close(); - } - - public void testDestinationOnSend() throws Exception - { - Session ssn = _connection.createSession(false,Session.CLIENT_ACKNOWLEDGE); - MessageConsumer cons = ssn.createConsumer(ssn.createTopic("amq.topic/test")); - MessageProducer prod = ssn.createProducer(null); - - Queue queue = ssn.createQueue("amq.topic/test"); - prod.send(queue,ssn.createTextMessage("A")); - - Message msg = cons.receive(1000); - assertNotNull(msg); - assertEquals("A",((TextMessage)msg).getText()); - prod.close(); - cons.close(); - } - - public void testReplyToWithNamelessExchange() throws Exception - { - System.setProperty("qpid.declare_exchanges","false"); - replyToTest("ADDR:my-queue;{create: always}"); - System.setProperty("qpid.declare_exchanges","true"); - } - - public void testReplyToWithCustomExchange() throws Exception - { - replyToTest("ADDR:hello;{create:always,node:{type:topic}}"); - } - - private void replyToTest(String replyTo) throws Exception - { - Session session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Destination replyToDest = AMQDestination.createDestination(replyTo); - MessageConsumer replyToCons = session.createConsumer(replyToDest); - - Destination dest = session.createQueue("amq.direct/test"); - - MessageConsumer cons = session.createConsumer(dest); - MessageProducer prod = session.createProducer(dest); - Message m = session.createTextMessage("test"); - m.setJMSReplyTo(replyToDest); - prod.send(m); - - Message msg = cons.receive(); - MessageProducer prodR = session.createProducer(msg.getJMSReplyTo()); - prodR.send(session.createTextMessage("x")); - - Message m1 = replyToCons.receive(); - assertNotNull("The reply to consumer should have received the messsage",m1); - } - - public void testAltExchangeInAddressString() throws Exception - { - String addr1 = "ADDR:my-exchange/test; {create: always, node:{type: topic,x-declare:{alternate-exchange:'amq.fanout'}}}"; - Session session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - String altQueueAddr = "ADDR:my-alt-queue;{create: always, delete: receiver,node:{x-bindings:[{exchange:'amq.fanout'}] }}"; - MessageConsumer cons = session.createConsumer(session.createQueue(altQueueAddr)); - - MessageProducer prod = session.createProducer(session.createTopic(addr1)); - prod.send(session.createMessage()); - prod.close(); - assertNotNull("The consumer on the queue bound to the alt-exchange should receive the message",cons.receive(1000)); - - String addr2 = "ADDR:test-queue;{create:sender, delete: sender,node:{type:queue,x-declare:{alternate-exchange:'amq.fanout'}}}"; - prod = session.createProducer(session.createTopic(addr2)); - prod.send(session.createMessage()); - prod.close(); - assertNotNull("The consumer on the queue bound to the alt-exchange should receive the message",cons.receive(1000)); - cons.close(); - } } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/SelectorTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/SelectorTest.java index b1c8b5682f..49a608190d 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/SelectorTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/SelectorTest.java @@ -23,7 +23,6 @@ package org.apache.qpid.test.client.message; import java.util.concurrent.CountDownLatch; import javax.jms.DeliveryMode; -import javax.jms.Destination; import javax.jms.InvalidSelectorException; import javax.jms.JMSException; import javax.jms.Message; @@ -31,7 +30,6 @@ import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Session; -import javax.jms.TextMessage; import junit.framework.Assert; @@ -52,6 +50,7 @@ public class SelectorTest extends QpidBrokerTestCase implements MessageListener private AMQConnection _connection; private AMQDestination _destination; private int count; + public String _connectionString = "vm://:1"; private static final String INVALID_SELECTOR = "Cost LIKE 5"; CountDownLatch _responseLatch = new CountDownLatch(1); @@ -281,36 +280,31 @@ public class SelectorTest extends QpidBrokerTestCase implements MessageListener Assert.assertNotNull("Msg5 should not be null", msg5); } - public void testSelectorWithJMSDeliveryMode() throws Exception + public static void main(String[] argv) throws Exception { - Session session = _connection.createSession(false, Session.SESSION_TRANSACTED); + SelectorTest test = new SelectorTest(); + test._connectionString = (argv.length == 0) ? "localhost:3000" : argv[0]; - Destination dest1 = session.createTopic("test1"); - Destination dest2 = session.createTopic("test2"); - - MessageProducer prod1 = session.createProducer(dest1); - MessageProducer prod2 = session.createProducer(dest2); - MessageConsumer consumer1 = session.createConsumer(dest1,"JMSDeliveryMode = 'PERSISTENT'"); - MessageConsumer consumer2 = session.createConsumer(dest2,"JMSDeliveryMode = 'NON_PERSISTENT'"); - - Message msg1 = session.createTextMessage("Persistent"); - prod1.send(msg1); - prod2.send(msg1); - - prod1.setDeliveryMode(DeliveryMode.NON_PERSISTENT); - prod2.setDeliveryMode(DeliveryMode.NON_PERSISTENT); - - Message msg2 = session.createTextMessage("Non_Persistent"); - prod1.send(msg2); - prod2.send(msg2); - - TextMessage m1 = (TextMessage)consumer1.receive(1000); - assertEquals("Consumer1 should receive the persistent message","Persistent",m1.getText()); - assertNull("Consumer1 should not receiver another message",consumer1.receive(1000)); - - TextMessage m2 = (TextMessage)consumer2.receive(1000); - assertEquals("Consumer2 should receive the non persistent message","Non_Persistent",m2.getText()); - assertNull("Consumer2 should not receiver another message",consumer2.receive(1000)); + try + { + while (true) + { + if (test._connectionString.contains("vm://:1")) + { + test.setUp(); + } + test.testUsingOnMessage(); + + if (test._connectionString.contains("vm://:1")) + { + test.tearDown(); + } + } + } + catch (Exception e) + { + System.err.println(e.getMessage()); + e.printStackTrace(); + } } - } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/qpid/AMQPFeatureDecorator.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/qpid/AMQPFeatureDecorator.java new file mode 100644 index 0000000000..c11f75e742 --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/qpid/AMQPFeatureDecorator.java @@ -0,0 +1,96 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.framework.qpid; + +import junit.framework.Test; +import junit.framework.TestResult; + +import org.apache.qpid.test.framework.FrameworkBaseCase; +import org.apache.qpid.test.framework.LocalAMQPCircuitFactory; + +import org.apache.qpid.junit.extensions.WrappedSuiteTestDecorator; + +/** + * AMQPFeatureDecorator applies decorations to {@link FrameworkBaseCase} tests, so that they may use Qpid/AMQP specific + * features, not available through JMS. For example, the immediate and mandatory flags. This decorator replaces the + * standard test circuit factory on the base class with one that allows these features to be used. + * + * <p/><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaborations + * <tr><td> Substitute the circuit factory with an AMQP/Qpid specific one. + * </table> + * + * @todo This wrapper substitutes in a LocalAMQPCircuitFactory, which is fine for local tests. For distributed tests + * the Fanout or Interop factories are substituted in by their decorators instead. These actually use + * distributed circuit static create methods to build the circuits, which should actually be changed to a factory, + * so that static methods do not need to be used. The distributed circuit creater delegates the circuit + * construction to remote test nodes. This decorator should not be used with distributed tests, or should be made + * aware of them, in which case it might ensure that an AMQP feature (implied already by other properties) flag + * is passed out to the remote test nodes, and provide a mechansim for them to decorate their circuit creation + * with AMQP features too. Add factory substituion/decoration mechansim for test clients, here or in a seperate + * class. + */ +public class AMQPFeatureDecorator extends WrappedSuiteTestDecorator +{ + /** The test suite to run. */ + private Test test; + + /** + * Creates a wrapped test test decorator from another one. + * + * @param test The test test. + */ + public AMQPFeatureDecorator(WrappedSuiteTestDecorator test) + { + super(test); + this.test = test; + } + + /** + * Runs the tests with a LocalAMQPCircuitFactory. Only tests that extend FrameworkBaseCase are decorated. + * + * @param testResult The the results object to monitor the test results with. + */ + public void run(TestResult testResult) + { + for (Test test : getAllUnderlyingTests()) + { + if (test instanceof FrameworkBaseCase) + { + FrameworkBaseCase frameworkTest = (FrameworkBaseCase) test; + frameworkTest.setCircuitFactory(new LocalAMQPCircuitFactory()); + } + } + + // Run the test. + test.run(testResult); + } + + /** + * Prints the name of the test for debugging purposes. + * + * @return The name of the test. + */ + public String toString() + { + return "AMQPFeatureDecorator: [test = \"" + test + "\"]"; + } +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/qpid/CauseFailureDecorator.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/qpid/CauseFailureDecorator.java new file mode 100644 index 0000000000..2708253d86 --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/qpid/CauseFailureDecorator.java @@ -0,0 +1,95 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.framework.qpid; + +import junit.framework.Test; +import junit.framework.TestResult; + +import org.apache.qpid.test.framework.BrokerLifecycleAware; +import org.apache.qpid.test.framework.CauseFailureUserPrompt; + +import org.apache.qpid.junit.extensions.WrappedSuiteTestDecorator; + +/** + * CauseFailureDecorator applies decorations to {@link BrokerLifecycleAware} tests, so that they may use different failure + * mechanisms. It is capable of detecting when a test case uses in-vm brokers, and setting up an automatic failure + * for those tests, so that the current live broker can be shut-down by test cases. For external brokers, automatic + * failure could be implemented, for example by having a kill script. At the moment this sets up the failure to prompt + * a user interactively to cause a failure, using {@link CauseFailureUserPrompt}. + * + * <p/><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaborations + * <tr><td> Setup automatic failures for in-vm brokers. <td> {@link CauseFailureInVM} + * <tr><td> Setup user generated failures for external brokers. <td> {@link CauseFailureUserPrompt}. + * <tr><td> + * </table> + * + * @todo Slight problem in that CauseFailureInVM is Qpid specific, whereas CauseFailureUserPrompt is not. Would like the + * failure decorator to be non-qpid specific so that it can test failure of any JMS implementation too. Either pass + * in class name of failure mechanism, set it up in the in-vm decorator instead of here but with prompt user as the + * default for when the in-vm decorator is not used? + */ +public class CauseFailureDecorator extends WrappedSuiteTestDecorator +{ + /** The test suite to run. */ + private Test test; + + /** + * Creates a wrapped test test decorator from another one. + * + * @param test The test test. + */ + public CauseFailureDecorator(WrappedSuiteTestDecorator test) + { + super(test); + this.test = test; + } + + /** + * Runs the tests with a LocalAMQPCircuitFactory. Only tests that extend FrameworkBaseCase are decorated. + * + * @param testResult The the results object to monitor the test results with. + */ + public void run(TestResult testResult) + { + for (Test test : getAllUnderlyingTests()) + { + if (test instanceof BrokerLifecycleAware) + { + BrokerLifecycleAware failureTest = (BrokerLifecycleAware) test; + failureTest.setFailureMechanism(new CauseFailureUserPrompt()); + } + } + + // Run the test. + test.run(testResult); + } + + /** + * Prints the name of the test for debugging purposes. + * + * @return The name of the test. + */ + public String toString() + { + return "CauseFailureDecorator: [test = \"" + test + "\"]"; + } +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/qpid/CauseFailureInVM.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/qpid/CauseFailureInVM.java new file mode 100644 index 0000000000..3e03ad0872 --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/qpid/CauseFailureInVM.java @@ -0,0 +1,70 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.framework.qpid; + +import org.apache.qpid.client.transport.TransportConnection; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.test.framework.CauseFailure; +import org.apache.qpid.test.framework.BrokerLifecycleAware; + +/** + * <p/><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaborations + * <tr><td> Cause messaging broker failure on the active in-vm broker. + * <td> {@link TransportConnection}, {@link ApplicationRegistry} + * </table> + */ +public class CauseFailureInVM implements CauseFailure +{ + /** Holds the in-vm broker instrumented test case to create failures for. */ + private BrokerLifecycleAware inVMTest; + + /** + * Creates an automated failure mechanism for testing against in-vm brokers. The test to create the mechanism + * for is specified, and as this failure is for in-vm brokers, the test must be {@link org.apache.qpid.test.framework.BrokerLifecycleAware}. The test + * must also report that it is currently being run against an in-vm broker, and it is a runtime error if it is not, + * as the creator of this failure mechanism should already have checked that it is. + * + * @param inVMTest The test case to create an automated failure mechanism for. + */ + public CauseFailureInVM(BrokerLifecycleAware inVMTest) + { + // Check that the test is really using in-vm brokers. + if (!inVMTest.usingInVmBroker()) + { + throw new RuntimeException( + "Cannot create in-vm broker failure mechanism for a test that is not using in-vm brokers."); + } + + this.inVMTest = inVMTest; + } + + /** + * Causes the active message broker to fail. + */ + public void causeFailure() + { + int liveBroker = inVMTest.getLiveBroker(); + + TransportConnection.killVMBroker(liveBroker); + ApplicationRegistry.remove(liveBroker); + } +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/qpid/InVMBrokerDecorator.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/qpid/InVMBrokerDecorator.java new file mode 100644 index 0000000000..b92a72a654 --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/qpid/InVMBrokerDecorator.java @@ -0,0 +1,136 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.framework.qpid; + +import junit.framework.Test; +import junit.framework.TestResult; + +import org.apache.qpid.client.transport.TransportConnection; +import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.test.framework.BrokerLifecycleAware; +import org.apache.qpid.test.framework.FrameworkBaseCase; + +import org.apache.qpid.junit.extensions.SetupTaskAware; +import org.apache.qpid.junit.extensions.WrappedSuiteTestDecorator; + +/** + * InVMBrokerDecorator is a test decorator, that is activated when running tests against an in-vm broker only. Its + * purpose is to automatically create, and close and delete an in-vm broker, during the set-up and tear-down of + * each test case. This decorator may only be used in conjunction with tests that extend {@link FrameworkBaseCase}. + * + * <p/><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaborations + * <tr><td> Create/Destroy an in-vm broker on every test run. + * </table> + * + * @todo May need to add a more fine grained injection point for the in-vm broker management, as this acts at the + * suite level, rather than the individual test level. + * + * @todo Management of in-vm brokers for failure testing. Failure test setups may need to set their connection url to + * use multiple broker (vm://:1;vm://:2), with fail-over between them. There is round-robin fail-over, but also + * retry? A test case using an in-vm broker needs to record which one it is using, so that it can be + * killed/restarted. + */ +public class InVMBrokerDecorator extends WrappedSuiteTestDecorator +{ + /** The test suite to run. */ + private Test test; + + /** + * Creates a wrapped test suite decorator from another one. + * + * @param test The test suite. + */ + public InVMBrokerDecorator(WrappedSuiteTestDecorator test) + { + super(test); + this.test = test; + } + + /** + * Runs the tests with in-vm broker creation and clean-up added to the tests task stack. + * + * @param testResult The the results object to monitor the test results with. + */ + public void run(TestResult testResult) + { + for (Test test : getAllUnderlyingTests()) + { + // Check that the test to have an in-vm broker setup/teardown task added to it, is actually a framework + // test that can handle setup tasks. + if ((test instanceof SetupTaskAware)) + { + SetupTaskAware frameworkTest = (SetupTaskAware) test; + + frameworkTest.chainSetupTask(new Runnable() + { + public void run() + { + // Ensure that the in-vm broker is created. + try + { + ApplicationRegistry.getInstance(1); + TransportConnection.createVMBroker(1); + } + catch (AMQVMBrokerCreationException e) + { + throw new RuntimeException("In-VM broker creation failed: " + e.getMessage(), e); + } + } + }); + + frameworkTest.chainTearDownTask(new Runnable() + { + public void run() + { + // Ensure that the in-vm broker is cleaned up so that the next test starts afresh. + TransportConnection.killVMBroker(1); + ApplicationRegistry.remove(1); + } + }); + + // Check if the test is aware whether or not it can control the broker life cycle, and if so provide + // additional instrumentation for it to control the in-vm broker through. + if (test instanceof BrokerLifecycleAware) + { + BrokerLifecycleAware inVMTest = (BrokerLifecycleAware) test; + inVMTest.setInVmBrokers(); + inVMTest.setLiveBroker(1); + inVMTest.setFailureMechanism(new CauseFailureInVM(inVMTest)); + } + } + } + + // Run the test. + test.run(testResult); + } + + /** + * Prints the name of the test for debugging purposes. + * + * @return The name of the test. + */ + public String toString() + { + return "InVMBrokerDecorator: [test = " + test + "]"; + } +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/Acknowledge2ConsumersTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/Acknowledge2ConsumersTest.java index 23efb656d2..4b45a96c20 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/Acknowledge2ConsumersTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/Acknowledge2ConsumersTest.java @@ -23,7 +23,7 @@ package org.apache.qpid.test.unit.ack; import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.AMQSession; -import org.apache.qpid.test.utils.QpidBrokerTestCase; +import org.apache.qpid.test.utils.FailoverBaseCase; import javax.jms.Connection; import javax.jms.JMSException; @@ -32,7 +32,7 @@ import javax.jms.MessageConsumer; import javax.jms.Queue; import javax.jms.Session; -public class Acknowledge2ConsumersTest extends QpidBrokerTestCase +public class Acknowledge2ConsumersTest extends FailoverBaseCase { protected static int NUM_MESSAGES = 100; protected Connection _con; diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/QuickAcking.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/QuickAcking.java index 13c78c1e14..6c83136511 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/QuickAcking.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/QuickAcking.java @@ -20,7 +20,7 @@ */ package org.apache.qpid.test.unit.ack; -import java.util.concurrent.CountDownLatch; +import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.jms.ConnectionListener; import org.apache.qpid.test.utils.QpidBrokerTestCase; diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/MultipleConnectionTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/MultipleConnectionTest.java index 87eae32cf8..3a5f676ca6 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/MultipleConnectionTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/MultipleConnectionTest.java @@ -23,6 +23,7 @@ import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.AMQSession; import org.apache.qpid.client.AMQTopic; +import org.apache.qpid.client.transport.TransportConnection; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.test.utils.QpidBrokerTestCase; diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java index 481b144caf..292bcd6039 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.test.unit.client; +import java.io.BufferedReader; import java.io.BufferedWriter; import java.io.InputStreamReader; import java.io.LineNumberReader; @@ -36,9 +37,11 @@ import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.QueueSession; import javax.jms.Session; +import javax.jms.TextMessage; import javax.jms.TopicSession; import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQConnectionDelegate_0_10; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQSession; import org.apache.qpid.client.AMQTopic; @@ -228,8 +231,7 @@ public class AMQConnectionTest extends QpidBrokerTestCase } MessageConsumer consumerB = null; - // 0-8, 0-9, 0-9-1 prefetch is per session, not consumer. - if (!isBroker010()) + if (isBroker08()) { Session consSessB = _connection.createSession(true, Session.AUTO_ACKNOWLEDGE); consumerB = consSessB.createConsumer(_queue); diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/DynamicQueueExchangeCreateTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/DynamicQueueExchangeCreateTest.java index 8577fb5b6a..33575b58aa 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/DynamicQueueExchangeCreateTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/DynamicQueueExchangeCreateTest.java @@ -20,8 +20,6 @@ */ package org.apache.qpid.test.unit.client; -import org.apache.qpid.AMQException; -import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.test.utils.QpidBrokerTestCase; import javax.jms.Connection; @@ -34,9 +32,11 @@ import javax.jms.Session; * * Test to validate that setting the respective qpid.declare_queues, * qpid.declare_exchanges system properties functions as expected. + * */ public class DynamicQueueExchangeCreateTest extends QpidBrokerTestCase { + public void testQueueDeclare() throws Exception { setSystemProperty("qpid.declare_queues", "false"); @@ -53,8 +53,11 @@ public class DynamicQueueExchangeCreateTest extends QpidBrokerTestCase fail("JMSException should be thrown as the queue does not exist"); } catch (JMSException e) - { - checkExceptionErrorCode(e, AMQConstant.NOT_FOUND); + { + assertTrue("Exception should be that the queue does not exist :" + + e.getMessage(), + e.getMessage().contains("does not exist")); + } } @@ -76,15 +79,10 @@ public class DynamicQueueExchangeCreateTest extends QpidBrokerTestCase } catch (JMSException e) { - checkExceptionErrorCode(e, AMQConstant.NOT_FOUND); + assertTrue("Exception should be that the exchange does not exist :" + + e.getMessage(), + e.getMessage().contains("Exchange " + EXCHANGE_TYPE + " does not exist")); } } - private void checkExceptionErrorCode(JMSException original, AMQConstant code) - { - Exception linked = original.getLinkedException(); - assertNotNull("Linked exception should have been set", linked); - assertTrue("Linked exception should be an AMQException", linked instanceof AMQException); - assertEquals("Error code should be " + code.getCode(), code, ((AMQException) linked).getErrorCode()); - } } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java index aae8b1feb9..79e2ff8148 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java @@ -24,6 +24,7 @@ import junit.textui.TestRunner; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.transport.TransportConnection; import org.apache.qpid.test.utils.QpidBrokerTestCase; import org.slf4j.Logger; diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java index 2e8a2d049d..f0794c9dab 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java @@ -25,9 +25,11 @@ import org.apache.qpid.test.utils.QpidBrokerTestCase; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.protocol.AMQProtocolHandler; +import org.apache.qpid.client.transport.TransportConnection; import org.apache.qpid.framing.*; import org.apache.qpid.jms.ConnectionListener; import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.url.URLSyntaxException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,67 +49,70 @@ public class ChannelCloseTest extends QpidBrokerTestCase implements ExceptionLis private static final Logger _logger = LoggerFactory.getLogger(ChannelCloseTest.class); Connection _connection; + private String _brokerlist = "vm://:1"; private Session _session; private static final long SYNC_TIMEOUT = 500; private int TEST = 0; - /** - * Close channel, use chanel with same id ensure error. - * - * This test is only valid for non 0-10 connection . + /* + close channel, use chanel with same id ensure error. */ public void testReusingChannelAfterFullClosure() throws Exception { - _connection=newConnection(); - - // Create Producer - try + // this is testing an inVM Connetion conneciton + if (isJavaBroker() && !isExternalBroker()) { - _connection.start(); - - createChannelAndTest(1); + _connection=newConnection(); - // Cause it to close + // Create Producer try { - _logger.info("Testing invalid exchange"); - declareExchange(1, "", "name_that_will_lookup_to_null", false); - fail("Exchange name is empty so this should fail "); - } - catch (AMQException e) - { - assertEquals("Exchange should not be found", AMQConstant.NOT_FOUND, e.getErrorCode()); - } + _connection.start(); - // Check that - try - { - _logger.info("Testing valid exchange should fail"); - declareExchange(1, "topic", "amq.topic", false); - fail("This should not succeed as the channel should be closed "); - } - catch (AMQException e) - { - if (_logger.isInfoEnabled()) + createChannelAndTest(1); + + // Cause it to close + try + { + _logger.info("Testing invalid exchange"); + declareExchange(1, "", "name_that_will_lookup_to_null", false); + fail("Exchange name is empty so this should fail "); + } + catch (AMQException e) { - _logger.info("Exception occured was:" + e.getErrorCode()); + assertEquals("Exchange should not be found", AMQConstant.NOT_FOUND, e.getErrorCode()); } - assertEquals("Connection should be closed", AMQConstant.CHANNEL_ERROR, e.getErrorCode()); + // Check that + try + { + _logger.info("Testing valid exchange should fail"); + declareExchange(1, "topic", "amq.topic", false); + fail("This should not succeed as the channel should be closed "); + } + catch (AMQException e) + { + if (_logger.isInfoEnabled()) + { + _logger.info("Exception occured was:" + e.getErrorCode()); + } - _connection=newConnection(); - } + assertEquals("Connection should be closed", AMQConstant.CHANNEL_ERROR, e.getErrorCode()); - checkSendingMessage(); + _connection=newConnection(); + } - _session.close(); - _connection.close(); + checkSendingMessage(); - } - catch (JMSException e) - { - e.printStackTrace(); - fail(e.getMessage()); + _session.close(); + _connection.close(); + + } + catch (JMSException e) + { + e.printStackTrace(); + fail(e.getMessage()); + } } } @@ -301,19 +306,27 @@ public class ChannelCloseTest extends QpidBrokerTestCase implements ExceptionLis private Connection newConnection() { - Connection connection = null; + AMQConnection connection = null; try { - connection = getConnection(); + connection = new AMQConnection("amqp://guest:guest@CCTTest/test?brokerlist='" + _brokerlist + "'"); - ((AMQConnection) connection).setConnectionListener(this); + connection.setConnectionListener(this); _session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); connection.start(); } - catch (Exception e) + catch (JMSException e) + { + fail("Creating new connection when:" + e.getMessage()); + } + catch (AMQException e) + { + fail("Creating new connection when:" + e.getMessage()); + } + catch (URLSyntaxException e) { fail("Creating new connection when:" + e.getMessage()); } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java index 124e756fad..04fc611cd1 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java @@ -20,30 +20,32 @@ */ package org.apache.qpid.test.unit.client.connection; -import javax.jms.Connection; -import javax.jms.QueueSession; -import javax.jms.TopicSession; - import org.apache.qpid.AMQConnectionFailureException; import org.apache.qpid.AMQException; import org.apache.qpid.AMQUnresolvedAddressException; +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.test.utils.QpidBrokerTestCase; +import org.apache.qpid.client.AMQAuthenticationException; import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.AMQConnectionURL; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQSession; import org.apache.qpid.client.AMQTopic; +import org.apache.qpid.client.AMQConnectionURL; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.jms.BrokerDetails; -import org.apache.qpid.jms.ConnectionURL; import org.apache.qpid.jms.Session; -import org.apache.qpid.test.utils.QpidBrokerTestCase; +import org.apache.qpid.jms.ConnectionURL; +import org.apache.qpid.jms.BrokerDetails; + +import javax.jms.Connection; +import javax.jms.QueueSession; +import javax.jms.TopicSession; +import javax.naming.NamingException; public class ConnectionTest extends QpidBrokerTestCase { - String _broker_NotRunning = "tcp://localhost:" + findFreePort(); - + String _broker_NotRunning = "vm://:2"; String _broker_BadDNS = "tcp://hg3sgaaw4lgihjs"; public void testSimpleConnection() throws Exception @@ -85,17 +87,17 @@ public class ConnectionTest extends QpidBrokerTestCase AMQSession sess = (AMQSession) conn.createSession(false, Session.AUTO_ACKNOWLEDGE); - - sess.declareExchange(new AMQShortString("test.direct"), + + sess.declareExchange(new AMQShortString("test.direct"), ExchangeDefaults.DIRECT_EXCHANGE_CLASS, false); - sess.declareExchange(new AMQShortString("tmp.direct"), + sess.declareExchange(new AMQShortString("tmp.direct"), ExchangeDefaults.DIRECT_EXCHANGE_CLASS, false); - sess.declareExchange(new AMQShortString("tmp.topic"), + sess.declareExchange(new AMQShortString("tmp.topic"), ExchangeDefaults.TOPIC_EXCHANGE_CLASS, false); - sess.declareExchange(new AMQShortString("test.topic"), + sess.declareExchange(new AMQShortString("test.topic"), ExchangeDefaults.TOPIC_EXCHANGE_CLASS, false); QueueSession queueSession = conn.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); @@ -111,7 +113,7 @@ public class ConnectionTest extends QpidBrokerTestCase queueSession.close(); TopicSession topicSession = conn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); - + AMQTopic topic = (AMQTopic) topicSession.createTopic("silly.topic"); assertEquals(topic.getExchangeName().toString(), "test.topic"); @@ -269,7 +271,7 @@ public class ConnectionTest extends QpidBrokerTestCase } connection.close(); } - + public void testUnsupportedSASLMechanism() throws Exception { BrokerDetails broker = getBroker(); @@ -287,37 +289,11 @@ public class ConnectionTest extends QpidBrokerTestCase { assertTrue("Incorrect exception thrown", e.getMessage().contains("The following SASL mechanisms " + - "[MY_MECH]" + + "[MY_MECH]" + " specified by the client are not supported by the broker")); } } - public void testClientIDVerification() throws Exception - { - System.setProperty("qpid.verify_client_id", "true"); - BrokerDetails broker = getBroker(); - try - { - Connection con = new AMQConnection(broker.toString(), "guest", "guest", - "client_id", "test"); - - Connection con2 = new AMQConnection(broker.toString(), "guest", "guest", - "client_id", "test"); - - fail("The client should throw a ConnectionException stating the" + - " client ID is not unique"); - } - catch (Exception e) - { - assertTrue("Incorrect exception thrown", - e.getMessage().contains("ClientID must be unique")); - } - finally - { - System.setProperty("qpid.verify_client_id", "false"); - } - } - public static junit.framework.Test suite() { return new junit.framework.TestSuite(ConnectionTest.class); diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java index 5701b5a1fd..278b9e9c04 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java @@ -31,21 +31,21 @@ import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.protocol.AMQProtocolSession; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.test.utils.QpidBrokerTestCase; -import org.apache.qpid.transport.TestNetworkConnection; +import org.apache.qpid.transport.TestNetworkDriver; public class AMQProtocolSessionTest extends QpidBrokerTestCase { - private static class TestProtocolSession extends AMQProtocolSession + private static class AMQProtSession extends AMQProtocolSession { - public TestProtocolSession(AMQProtocolHandler protocolHandler, AMQConnection connection) + public AMQProtSession(AMQProtocolHandler protocolHandler, AMQConnection connection) { super(protocolHandler,connection); } - public TestNetworkConnection getNetworkConnection() + public TestNetworkDriver getNetworkDriver() { - return (TestNetworkConnection) _protocolHandler.getNetworkConnection(); + return (TestNetworkDriver) _protocolHandler.getNetworkDriver(); } public AMQShortString genQueueName() @@ -54,7 +54,7 @@ public class AMQProtocolSessionTest extends QpidBrokerTestCase } } - private TestProtocolSession _testSession; + private AMQProtSession _testSession; protected void setUp() throws Exception { @@ -62,10 +62,10 @@ public class AMQProtocolSessionTest extends QpidBrokerTestCase AMQConnection con = (AMQConnection) getConnection("guest", "guest"); AMQProtocolHandler protocolHandler = new AMQProtocolHandler(con); - protocolHandler.setNetworkConnection(new TestNetworkConnection()); - + protocolHandler.setNetworkDriver(new TestNetworkDriver()); + //don't care about the values set here apart from the dummy IoSession - _testSession = new TestProtocolSession(protocolHandler , con); + _testSession = new AMQProtSession(protocolHandler , con); } public void testTemporaryQueueWildcard() throws UnknownHostException @@ -93,9 +93,14 @@ public class AMQProtocolSessionTest extends QpidBrokerTestCase checkTempQueueName(new InetSocketAddress(InetAddress.getByName("1080:0:0:0:8:800:200C:417A"), 1234), "tmp_1080_0_0_0_8_800_200c_417a_1234_1"); } + public void testTemporaryQueuePipe() throws UnknownHostException + { + checkTempQueueName(new VmPipeAddress(1), "tmp_vm_1_1"); + } + private void checkTempQueueName(SocketAddress address, String queueName) { - _testSession.getNetworkConnection().setLocalAddress(address); + _testSession.getNetworkDriver().setLocalAddress(address); assertEquals("Wrong queue name", queueName, _testSession.genQueueName().asString()); } } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java index f5e0ed75d2..de092fc893 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java @@ -50,6 +50,7 @@ public class MessageRequeueTest extends QpidBrokerTestCase protected final String queue = "direct://amq.direct//message-requeue-test-queue"; protected String payload = "Message:"; + //protected final String BROKER = "vm://:1"; protected final String BROKER = "tcp://127.0.0.1:5672"; private boolean testReception = true; @@ -154,8 +155,8 @@ public class MessageRequeueTest extends QpidBrokerTestCase _logger.info("consumed: " + messagesReceived); assertEquals("number of consumed messages does not match initial data", (int) numTestMessages, messagesReceived); - // with 0_10 we can have a delivery tag of 0 - if (!conn.isBroker010()) + // wit 0_10 we can have a delivery tag of 0 + if (conn.isBroker08()) { for (long b : messageLog) { @@ -223,7 +224,7 @@ public class MessageRequeueTest extends QpidBrokerTestCase StringBuilder list = new StringBuilder(); list.append("Failed to receive:"); int failed = 0; - if (!conn.isBroker010()) + if (conn.isBroker08()) { for (long b : receieved) { diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ct/DurableSubscriberTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ct/DurableSubscriberTest.java index 80422cf3e9..989ac98747 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ct/DurableSubscriberTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ct/DurableSubscriberTest.java @@ -52,7 +52,7 @@ public class DurableSubscriberTest extends QpidBrokerTestCase */ public void testDurSubRestoredAfterNonPersistentMessageSent() throws Exception { - if (isBrokerStorePersistent()) + if (isBrokerStorePersistent() || !isBroker08()) { TopicConnectionFactory factory = getConnectionFactory(); Topic topic = (Topic) getInitialContext().lookup(_topicName); @@ -116,7 +116,7 @@ public class DurableSubscriberTest extends QpidBrokerTestCase */ public void testDurSubRestoresMessageSelector() throws Exception { - if (isBrokerStorePersistent()) + if (isBrokerStorePersistent() || !isBroker08()) { TopicConnectionFactory factory = getConnectionFactory(); Topic topic = (Topic) getInitialContext().lookup(_topicName); diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java index 97452ad1c8..830421a01f 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java @@ -25,14 +25,12 @@ import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQSession; import org.apache.qpid.client.message.NonQpidObjectMessage; -import org.apache.qpid.client.message.QpidMessageProperties; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.test.utils.QpidBrokerTestCase; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.jms.Connection; import javax.jms.Destination; import javax.jms.Message; import javax.jms.MessageConsumer; @@ -41,11 +39,7 @@ import javax.jms.MessageProducer; import javax.jms.ObjectMessage; import javax.jms.Queue; import javax.jms.Session; -import javax.jms.Topic; - import java.util.Enumeration; -import java.util.HashMap; -import java.util.Map; /** * @author Apache Software Foundation @@ -169,39 +163,4 @@ public class JMSPropertiesTest extends QpidBrokerTestCase con.close(); } - /** - * Test Goal : Test if custom message properties can be set and retrieved properly with out an error. - * Also test if unsupported properties are filtered out. See QPID-2930. - */ - public void testQpidExtensionProperties() throws Exception - { - Connection con = getConnection("guest", "guest"); - Session ssn = (AMQSession) con.createSession(false, Session.CLIENT_ACKNOWLEDGE); - con.start(); - - Topic topic = ssn.createTopic("test"); - MessageConsumer consumer = ssn.createConsumer(topic); - MessageProducer prod = ssn.createProducer(topic); - Message m = ssn.createMessage(); - m.setObjectProperty("foo-bar", "foobar".getBytes()); - m.setObjectProperty(QpidMessageProperties.AMQP_0_10_APP_ID, "my-app-id"); - prod.send(m); - - Message msg = consumer.receive(1000); - assertNotNull(msg); - - Enumeration<String> enu = msg.getPropertyNames(); - Map<String,String> map = new HashMap<String,String>(); - while (enu.hasMoreElements()) - { - String name = enu.nextElement(); - String value = msg.getStringProperty(name); - map.put(name, value); - } - - assertFalse("Property 'foo-bar' should have been filtered out",map.containsKey("foo-bar")); - assertEquals("Property "+ QpidMessageProperties.AMQP_0_10_APP_ID + " should be present","my-app-id",msg.getStringProperty(QpidMessageProperties.AMQP_0_10_APP_ID)); - assertEquals("Property "+ QpidMessageProperties.AMQP_0_10_ROUTING_KEY + " should be present","test",msg.getStringProperty(QpidMessageProperties.AMQP_0_10_ROUTING_KEY)); - - } } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutConfigurationTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutConfigurationTest.java deleted file mode 100644 index 36bac3b715..0000000000 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutConfigurationTest.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.test.unit.transacted; - -/** - * This verifies that changing the {@code transactionTimeout} configuration will alter - * the behaviour of the transaction open and idle logging, and that when the connection - * will be closed. - */ -public class TransactionTimeoutConfigurationTest extends TransactionTimeoutTestCase -{ - @Override - protected void configure() throws Exception - { - // Setup housekeeping every second - setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".housekeeping.expiredMessageCheckPeriod", "100"); - - // Set transaction timout properties. - setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".transactionTimeout.openWarn", "200"); - setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".transactionTimeout.openClose", "1000"); - setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".transactionTimeout.idleWarn", "100"); - setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".transactionTimeout.idleClose", "500"); - } - - public void testProducerIdleCommit() throws Exception - { - try - { - send(5, 0); - - sleep(2.0f); - - _psession.commit(); - fail("should fail"); - } - catch (Exception e) - { - _exception = e; - } - - monitor(5, 0); - - check(IDLE); - } - - public void testProducerOpenCommit() throws Exception - { - try - { - send(5, 0.3f); - - _psession.commit(); - fail("should fail"); - } - catch (Exception e) - { - _exception = e; - } - - monitor(6, 3); - - check(OPEN); - } -} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutDisabledTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutDisabledTest.java deleted file mode 100644 index 71b89bf911..0000000000 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutDisabledTest.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.test.unit.transacted; - -/** - * This verifies that the default behaviour is not to time out transactions. - */ -public class TransactionTimeoutDisabledTest extends TransactionTimeoutTestCase -{ - @Override - protected void configure() throws Exception - { - // Setup housekeeping every second - setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".housekeeping.expiredMessageCheckPeriod", "100"); - } - - public void testProducerIdleCommit() throws Exception - { - try - { - send(5, 0); - - sleep(2.0f); - - _psession.commit(); - } - catch (Exception e) - { - fail("Should have succeeded"); - } - - assertTrue("Listener should not have received exception", _caught.getCount() == 1); - - monitor(0, 0); - } - - public void testProducerOpenCommit() throws Exception - { - try - { - send(5, 0.3f); - - _psession.commit(); - } - catch (Exception e) - { - fail("Should have succeeded"); - } - - assertTrue("Listener should not have received exception", _caught.getCount() == 1); - - monitor(0, 0); - } -} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTest.java deleted file mode 100644 index c912d6a323..0000000000 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTest.java +++ /dev/null @@ -1,335 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.test.unit.transacted; - -/** - * This tests the behaviour of transactional sessions when the {@code transactionTimeout} configuration - * is set for a virtual host. - * - * A producer that is idle for too long or open for too long will have its connection closed and - * any further operations will fail with a 408 resource timeout exception. Consumers will not - * be affected by the transaction timeout configuration. - */ -public class TransactionTimeoutTest extends TransactionTimeoutTestCase -{ - public void testProducerIdle() throws Exception - { - try - { - sleep(2.0f); - - _psession.commit(); - } - catch (Exception e) - { - fail("Should have succeeded"); - } - - assertTrue("Listener should not have received exception", _caught.getCount() == 1); - - monitor(0, 0); - } - - public void testProducerIdleCommit() throws Exception - { - try - { - send(5, 0); - - sleep(2.0f); - - _psession.commit(); - fail("should fail"); - } - catch (Exception e) - { - _exception = e; - } - - monitor(5, 0); - - check(IDLE); - } - - public void testProducerOpenCommit() throws Exception - { - try - { - send(6, 0.5f); - - _psession.commit(); - fail("should fail"); - } - catch (Exception e) - { - _exception = e; - } - - monitor(0, 10); - - check(OPEN); - } - - public void testProducerIdleCommitTwice() throws Exception - { - try - { - send(5, 0); - - sleep(1.0f); - - _psession.commit(); - - send(5, 0); - - sleep(2.0f); - - _psession.commit(); - fail("should fail"); - } - catch (Exception e) - { - _exception = e; - } - - monitor(10, 0); - - check(IDLE); - } - - public void testProducerOpenCommitTwice() throws Exception - { - try - { - send(5, 0); - - sleep(1.0f); - - _psession.commit(); - - send(6, 0.5f); - - _psession.commit(); - fail("should fail"); - } - catch (Exception e) - { - _exception = e; - } - - // the presistent store generates more idle messages? - monitor(isBrokerStorePersistent() ? 10 : 5, 10); - - check(OPEN); - } - - public void testProducerIdleRollback() throws Exception - { - try - { - send(5, 0); - - sleep(2.0f); - - _psession.rollback(); - fail("should fail"); - } - catch (Exception e) - { - _exception = e; - } - - monitor(5, 0); - - check(IDLE); - } - - public void testProducerIdleRollbackTwice() throws Exception - { - try - { - send(5, 0); - - sleep(1.0f); - - _psession.rollback(); - - send(5, 0); - - sleep(2.0f); - - _psession.rollback(); - fail("should fail"); - } - catch (Exception e) - { - _exception = e; - } - - monitor(10, 0); - - check(IDLE); - } - - public void testConsumerCommitClose() throws Exception - { - try - { - send(1, 0); - - _psession.commit(); - - expect(1, 0); - - _csession.commit(); - - sleep(3.0f); - - _csession.close(); - } - catch (Exception e) - { - fail("should have succeeded: " + e.getMessage()); - } - - assertTrue("Listener should not have received exception", _caught.getCount() == 1); - - monitor(0, 0); - } - - public void testConsumerIdleReceiveCommit() throws Exception - { - try - { - send(1, 0); - - _psession.commit(); - - sleep(2.0f); - - expect(1, 0); - - sleep(2.0f); - - _csession.commit(); - } - catch (Exception e) - { - fail("Should have succeeded"); - } - - assertTrue("Listener should not have received exception", _caught.getCount() == 1); - - monitor(0, 0); - } - - public void testConsumerIdleCommit() throws Exception - { - try - { - send(1, 0); - - _psession.commit(); - - expect(1, 0); - - sleep(2.0f); - - _csession.commit(); - } - catch (Exception e) - { - fail("Should have succeeded"); - } - - assertTrue("Listener should not have received exception", _caught.getCount() == 1); - - monitor(0, 0); - } - - public void testConsumerIdleRollback() throws Exception - { - try - { - send(1, 0); - - _psession.commit(); - - expect(1, 0); - - sleep(2.0f); - - _csession.rollback(); - } - catch (Exception e) - { - fail("Should have succeeded"); - } - - assertTrue("Listener should not have received exception", _caught.getCount() == 1); - - monitor(0, 0); - } - - public void testConsumerOpenCommit() throws Exception - { - try - { - send(1, 0); - - _psession.commit(); - - sleep(3.0f); - - _csession.commit(); - } - catch (Exception e) - { - fail("Should have succeeded"); - } - - assertTrue("Listener should not have received exception", _caught.getCount() == 1); - - monitor(0, 0); - } - - public void testConsumerOpenRollback() throws Exception - { - try - { - send(1, 0); - - _psession.commit(); - - sleep(3.0f); - - _csession.rollback(); - } - catch (Exception e) - { - fail("Should have succeeded"); - } - - assertTrue("Listener should not have received exception", _caught.getCount() == 1); - - monitor(0, 0); - } -} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTestCase.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTestCase.java deleted file mode 100644 index 786fc2adb0..0000000000 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTestCase.java +++ /dev/null @@ -1,253 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.test.unit.transacted; - -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -import javax.jms.DeliveryMode; -import javax.jms.ExceptionListener; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Queue; -import javax.jms.TextMessage; - -import junit.framework.TestCase; - -import org.apache.qpid.AMQException; -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.AMQConnectionURL; -import org.apache.qpid.client.AMQQueue; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.jms.ConnectionURL; -import org.apache.qpid.jms.Session; -import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.test.utils.QpidBrokerTestCase; -import org.apache.qpid.util.LogMonitor; - -/** - * The {@link TestCase} for transaction timeout testing. - */ -public class TransactionTimeoutTestCase extends QpidBrokerTestCase implements ExceptionListener -{ - public static final String VIRTUALHOST = "test"; - public static final String TEXT = "0123456789abcdefghiforgettherest"; - public static final String CHN_OPEN_TXN = "CHN-1007"; - public static final String CHN_IDLE_TXN = "CHN-1008"; - public static final String IDLE = "Idle"; - public static final String OPEN = "Open"; - - protected LogMonitor _monitor; - protected AMQConnection _con; - protected Session _psession, _csession; - protected Queue _queue; - protected MessageConsumer _consumer; - protected MessageProducer _producer; - protected CountDownLatch _caught = new CountDownLatch(1); - protected String _message; - protected Exception _exception; - protected AMQConstant _code; - - protected void configure() throws Exception - { - // Setup housekeeping every second - setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".housekeeping.expiredMessageCheckPeriod", "100"); - - /* - * Set transaction timout properties. The XML in the virtualhosts configuration is as follows: - * - * <transactionTimeout> - * <openWarn>1000</openWarn> - * <openClose>2000</openClose> - * <idleWarn>500</idleWarn> - * <idleClose>1500</idleClose> - * </transactionTimeout> - */ - setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".transactionTimeout.openWarn", "1000"); - setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".transactionTimeout.openClose", "2000"); - setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".transactionTimeout.idleWarn", "500"); - setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".transactionTimeout.idleClose", "1000"); - } - - protected void setUp() throws Exception - { - // Configure timeouts - configure(); - - // Monitor log file - _monitor = new LogMonitor(_outputFile); - - // Start broker - super.setUp(); - - // Connect to broker - String broker = ("tcp://localhost:" + DEFAULT_PORT); - ConnectionURL url = new AMQConnectionURL("amqp://guest:guest@clientid/test?brokerlist='" + broker + "'&maxprefetch='1'"); - _con = (AMQConnection) getConnection(url); - _con.setExceptionListener(this); - _con.start(); - - // Create queue - Session qsession = _con.createSession(true, Session.SESSION_TRANSACTED); - AMQShortString queueName = new AMQShortString("test"); - _queue = new AMQQueue(qsession.getDefaultQueueExchangeName(), queueName, queueName, false, true); - qsession.close(); - - // Create producer and consumer - producer(); - consumer(); - } - - protected void tearDown() throws Exception - { - try - { - _con.close(); - } - finally - { - super.tearDown(); - } - } - - /** - * Create a transacted persistent message producer session. - */ - protected void producer() throws Exception - { - _psession = _con.createSession(true, Session.SESSION_TRANSACTED); - _producer = _psession.createProducer(_queue); - _producer.setDeliveryMode(DeliveryMode.PERSISTENT); - } - - /** - * Create a transacted message consumer session. - */ - protected void consumer() throws Exception - { - _csession = _con.createSession(true, Session.SESSION_TRANSACTED); - _consumer = _csession.createConsumer(_queue); - } - - /** - * Send a number of messages to the queue, optionally pausing after each. - */ - protected void send(int count, float delay) throws Exception - { - for (int i = 0; i < count; i++) - { - sleep(delay); - Message msg = _psession.createTextMessage(TEXT); - msg.setIntProperty("i", i); - _producer.send(msg); - } - } - - /** - * Sleep for a number of seconds. - */ - protected void sleep(float seconds) throws Exception - { - try - { - Thread.sleep((long) (seconds * 1000.0f)); - } - catch (InterruptedException ie) - { - throw new RuntimeException("Interrupted"); - } - } - - /** - * Check for idle and open messages. - * - * Either exactly zero messages, or +-2 error accepted around the specified number. - */ - protected void monitor(int idle, int open) throws Exception - { - List<String> idleMsgs = _monitor.findMatches(CHN_IDLE_TXN); - List<String> openMsgs = _monitor.findMatches(CHN_OPEN_TXN); - - String idleErr = "Expected " + idle + " but found " + idleMsgs.size() + " txn idle messages"; - String openErr = "Expected " + open + " but found " + openMsgs.size() + " txn open messages"; - - if (idle == 0) - { - assertTrue(idleErr, idleMsgs.isEmpty()); - } - else - { - assertTrue(idleErr, idleMsgs.size() >= idle - 2 && idleMsgs.size() <= idle + 2); - } - - if (open == 0) - { - assertTrue(openErr, openMsgs.isEmpty()); - } - else - { - assertTrue(openErr, openMsgs.size() >= open - 2 && openMsgs.size() <= open + 2); - } - } - - /** - * Receive a number of messages, optionally pausing after each. - */ - protected void expect(int count, float delay) throws Exception - { - for (int i = 0; i < count; i++) - { - sleep(delay); - Message msg = _consumer.receive(1000); - assertNotNull("Message should not be null", msg); - assertTrue("Message should be a text message", msg instanceof TextMessage); - assertEquals("Message content does not match expected", TEXT, ((TextMessage) msg).getText()); - assertEquals("Message order is incorrect", i, msg.getIntProperty("i")); - } - } - - /** - * Checks that the correct exception was thrown and was received - * by the listener with a 506 error code. - */ - protected void check(String reason)throws InterruptedException - { - assertTrue("Should have caught exception in listener", _caught.await(1, TimeUnit.SECONDS)); - assertNotNull("Should have thrown exception to client", _exception); - assertTrue("Exception message should contain '" + reason + "': " + _message, _message.contains(reason + " transaction timed out")); - assertNotNull("Exception should have an error code", _code); - assertEquals("Error code should be 506", AMQConstant.RESOURCE_ERROR, _code); - } - - /** @see javax.jms.ExceptionListener#onException(javax.jms.JMSException) */ - public void onException(JMSException jmse) - { - _caught.countDown(); - _message = jmse.getLinkedException().getMessage(); - if (jmse.getLinkedException() instanceof AMQException) - { - _code = ((AMQException) jmse.getLinkedException()).getErrorCode(); - } - } -} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/BrokerHolder.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/BrokerHolder.java deleted file mode 100644 index 8345803d56..0000000000 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/BrokerHolder.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.test.utils; - -public interface BrokerHolder -{ - void shutdown(); -} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java index 10217585c1..d3b429e315 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java @@ -34,10 +34,24 @@ public class FailoverBaseCase extends QpidBrokerTestCase { protected static final Logger _logger = LoggerFactory.getLogger(FailoverBaseCase.class); + public static int FAILING_VM_PORT = 2; + public static int FAILING_PORT = Integer.parseInt(System.getProperty("test.port.alt")); public static final long DEFAULT_FAILOVER_TIME = 10000L; protected int failingPort; + protected int getFailingPort() + { + if (_broker.equals(VM)) + { + return FAILING_VM_PORT; + } + else + { + return FAILING_PORT; + } + } + protected void setUp() throws java.lang.Exception { super.setUp(); @@ -68,14 +82,6 @@ public class FailoverBaseCase extends QpidBrokerTestCase return _connectionFactory; } - @Override - public void stopBroker(int port) throws Exception - { - if (isBrokerPresent(port)) - { - super.stopBroker(port); - } - } public void tearDown() throws Exception { diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/InternalBrokerHolder.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/InternalBrokerHolder.java deleted file mode 100644 index 340f00fed8..0000000000 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/InternalBrokerHolder.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.test.utils; - -import org.apache.log4j.Logger; -import org.apache.qpid.server.Broker; - -public class InternalBrokerHolder implements BrokerHolder -{ - private static final Logger LOGGER = Logger.getLogger(InternalBrokerHolder.class); - private final Broker _broker; - - public InternalBrokerHolder(final Broker broker) - { - if(broker == null) - { - throw new IllegalArgumentException("Broker must not be null"); - } - - _broker = broker; - } - - public void shutdown() - { - LOGGER.info("Shutting down Broker instance"); - - _broker.shutdown(); - - LOGGER.info("Broker instance shutdown"); - } - -} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/JMXTestUtils.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/JMXTestUtils.java index 1fde6c7c73..ff80c91fac 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/JMXTestUtils.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/JMXTestUtils.java @@ -21,8 +21,6 @@ package org.apache.qpid.test.utils; import java.io.IOException; -import java.util.ArrayList; -import java.util.List; import java.util.Set; import javax.management.JMException; @@ -33,17 +31,14 @@ import javax.management.ObjectName; import javax.management.MalformedObjectNameException; import javax.management.remote.JMXConnector; -import junit.framework.TestCase; - import org.apache.commons.configuration.ConfigurationException; +import org.apache.qpid.commands.objects.AllObjects; import org.apache.qpid.management.common.JMXConnnectionFactory; import org.apache.qpid.management.common.mbeans.ManagedBroker; -import org.apache.qpid.management.common.mbeans.ManagedConnection; import org.apache.qpid.management.common.mbeans.ManagedExchange; import org.apache.qpid.management.common.mbeans.LoggingManagement; import org.apache.qpid.management.common.mbeans.ConfigurationManagement; import org.apache.qpid.management.common.mbeans.ManagedQueue; -import org.apache.qpid.management.common.mbeans.ServerInformation; import org.apache.qpid.management.common.mbeans.UserManagement; /** @@ -236,10 +231,10 @@ public class JMXTestUtils public ObjectName getVirtualHostManagerObjectName(String vhostName) { // Get the name of the test manager - String query = "org.apache.qpid:type=VirtualHost.VirtualHostManager,VirtualHost=" - + ObjectName.quote(vhostName) + ",*"; + AllObjects allObject = new AllObjects(_mbsc); + allObject.querystring = "org.apache.qpid:type=VirtualHost.VirtualHostManager,VirtualHost=" + vhostName + ",*"; - Set<ObjectName> objectNames = queryObjects(query); + Set<ObjectName> objectNames = allObject.returnObjects(); _test.assertNotNull("Null ObjectName Set returned", objectNames); _test.assertEquals("Incorrect number test vhosts returned", 1, objectNames.size()); @@ -263,14 +258,14 @@ public class JMXTestUtils public ObjectName getQueueObjectName(String virtualHostName, String queue) { // Get the name of the test manager - String query = "org.apache.qpid:type=VirtualHost.Queue,VirtualHost=" - + ObjectName.quote(virtualHostName) + ",name=" - + ObjectName.quote(queue) + ",*"; + AllObjects allObject = new AllObjects(_mbsc); + allObject.querystring = "org.apache.qpid:type=VirtualHost.Queue,VirtualHost=" + virtualHostName + ",name=" + queue + ",*"; - Set<ObjectName> objectNames = queryObjects(query); + Set<ObjectName> objectNames = allObject.returnObjects(); _test.assertNotNull("Null ObjectName Set returned", objectNames); - _test.assertEquals("Incorrect number of queues with name '" + queue + "' returned", 1, objectNames.size()); + _test.assertEquals("Incorrect number of queues with name '" + allObject.querystring + + "' returned", 1, objectNames.size()); // We have verified we have only one value in objectNames so return it ObjectName objectName = objectNames.iterator().next(); @@ -291,11 +286,10 @@ public class JMXTestUtils public ObjectName getExchangeObjectName(String virtualHostName, String exchange) { // Get the name of the test manager - String query = "org.apache.qpid:type=VirtualHost.Exchange,VirtualHost=" - + ObjectName.quote(virtualHostName) + ",name=" - + ObjectName.quote(exchange) + ",*"; + AllObjects allObject = new AllObjects(_mbsc); + allObject.querystring = "org.apache.qpid:type=VirtualHost.Exchange,VirtualHost=" + virtualHostName + ",name=" + exchange + ",*"; - Set<ObjectName> objectNames = queryObjects(query); + Set<ObjectName> objectNames = allObject.returnObjects(); _test.assertNotNull("Null ObjectName Set returned", objectNames); _test.assertEquals("Incorrect number of exchange with name '" + exchange + "' returned", 1, objectNames.size()); @@ -307,9 +301,12 @@ public class JMXTestUtils } @SuppressWarnings("static-access") - public <T> T getManagedObject(Class<T> managedClass, String query) + public <T> T getManagedObject(Class<T> managedClass, String queryString) { - Set<ObjectName> objectNames = queryObjects(query); + AllObjects allObject = new AllObjects(_mbsc); + allObject.querystring = queryString; + + Set<ObjectName> objectNames = allObject.returnObjects(); _test.assertNotNull("Null ObjectName Set returned", objectNames); _test.assertEquals("More than one " + managedClass + " returned", 1, objectNames.size()); @@ -324,16 +321,6 @@ public class JMXTestUtils return MBeanServerInvocationHandler.newProxyInstance(_mbsc, objectName, managedClass, false); } - public <T> List<T> getManagedObjectList(Class<T> managedClass, Set<ObjectName> objectNames) - { - List<T> objects = new ArrayList<T>(); - for (ObjectName name : objectNames) - { - objects.add(getManagedObject(managedClass, name)); - } - return objects; - } - public ManagedBroker getManagedBroker(String virtualHost) { return getManagedObject(ManagedBroker.class, getVirtualHostManagerObjectName(virtualHost)); @@ -368,68 +355,4 @@ public class JMXTestUtils ObjectName objectName = new ObjectName("org.apache.qpid:type=UserManagement,name=UserManagement"); return getManagedObject(UserManagement.class, objectName); } - - /** - * Retrive {@link ServerInformation} JMX MBean. - */ - public ServerInformation getServerInformation() - { - // Get the name of the test manager - String query = "org.apache.qpid:type=ServerInformation,name=ServerInformation,*"; - - Set<ObjectName> objectNames = queryObjects(query); - - TestCase.assertNotNull("Null ObjectName Set returned", objectNames); - TestCase.assertEquals("Incorrect number of objects returned", 1, objectNames.size()); - - // We have verified we have only one value in objectNames so return it - return getManagedObject(ServerInformation.class, objectNames.iterator().next()); - } - - /** - * Retrive all {@link ManagedConnection} objects. - */ - public List<ManagedConnection> getAllManagedConnections() - { - // Get the name of the test manager - String query = "org.apache.qpid:type=VirtualHost.Connection,VirtualHost=*,name=*"; - - Set<ObjectName> objectNames = queryObjects(query); - - TestCase.assertNotNull("Null ObjectName Set returned", objectNames); - - return getManagedObjectList(ManagedConnection.class, objectNames); - } - - /** - * Retrive all {@link ManagedConnection} objects for a particular virtual host. - */ - public List<ManagedConnection> getManagedConnections(String vhost) - { - // Get the name of the test manager - String query = "org.apache.qpid:type=VirtualHost.Connection,VirtualHost=" + ObjectName.quote(vhost) + ",name=*"; - - Set<ObjectName> objectNames = queryObjects(query); - - TestCase.assertNotNull("Null ObjectName Set returned", objectNames); - - return getManagedObjectList(ManagedConnection.class, objectNames); - } - - /** - * Returns the Set of ObjectNames returned by the broker for the given query, - * or null if there is problem while performing the query. - */ - private Set<ObjectName> queryObjects(String query) - { - try - { - return _mbsc.queryNames(new ObjectName(query), null); - } - catch (Exception e) - { - e.printStackTrace(); - return null; - } - } } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java index c8ccdf91bb..ae38a75e7a 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java @@ -25,7 +25,6 @@ import java.io.InputStreamReader; import java.io.LineNumberReader; import java.io.PrintStream; import java.net.MalformedURLException; -import java.net.URL; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -55,20 +54,19 @@ import org.apache.commons.lang.StringUtils; import org.apache.log4j.Level; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; +import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQConnectionFactory; import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.transport.TransportConnection; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.jms.ConnectionURL; import org.apache.qpid.management.common.mbeans.ConfigurationManagement; -import org.apache.qpid.server.Broker; -import org.apache.qpid.server.BrokerOptions; -import org.apache.qpid.server.ProtocolExclusion; import org.apache.qpid.server.configuration.ServerConfiguration; -import org.apache.qpid.server.protocol.AmqpProtocolVersion; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry; import org.apache.qpid.server.store.DerbyMessageStore; import org.apache.qpid.url.URLSyntaxException; -import org.apache.qpid.util.FileUtils; import org.apache.qpid.util.LogMonitor; /** @@ -76,13 +74,6 @@ import org.apache.qpid.util.LogMonitor; */ public class QpidBrokerTestCase extends QpidTestCase { - - public enum BrokerType - { - EXTERNAL /** Test case relies on a Broker started independently of the test-suite */, - INTERNAL /** Test case starts an embedded broker within this JVM */, - SPAWNED /** Test case spawns a new broker as a separate process */ - } protected final String QpidHome = System.getProperty("QPID_HOME"); protected File _configFile = new File(System.getProperty("broker.config")); @@ -91,6 +82,7 @@ public class QpidBrokerTestCase extends QpidTestCase protected long RECEIVE_TIMEOUT = 1000l; + private Map<String, String> _propertiesSetForTestOnly = new HashMap<String, String>(); private Map<String, String> _propertiesSetForBroker = new HashMap<String, String>(); private Map<Logger, Level> _loggerLevelSetForTest = new HashMap<Logger, Level>(); @@ -114,11 +106,9 @@ public class QpidBrokerTestCase extends QpidTestCase // system properties private static final String BROKER_LANGUAGE = "broker.language"; - private static final String BROKER_TYPE = "broker.type"; - private static final String BROKER_COMMAND = "broker.command"; + private static final String BROKER = "broker"; private static final String BROKER_CLEAN = "broker.clean"; private static final String BROKER_CLEAN_BETWEEN_TESTS = "broker.clean.between.tests"; - private static final String BROKER_EXISTING_QPID_WORK = "broker.existing.qpid.work"; private static final String BROKER_VERSION = "broker.version"; protected static final String BROKER_READY = "broker.ready"; private static final String BROKER_STOPPED = "broker.stopped"; @@ -126,30 +116,29 @@ public class QpidBrokerTestCase extends QpidTestCase private static final String BROKER_LOG_INTERLEAVE = "broker.log.interleave"; private static final String BROKER_LOG_PREFIX = "broker.log.prefix"; private static final String BROKER_PERSITENT = "broker.persistent"; - private static final String BROKER_PROTOCOL_EXCLUDES = "broker.protocol.excludes"; - // values protected static final String JAVA = "java"; protected static final String CPP = "cpp"; + protected static final String VM = "vm"; + protected static final String EXTERNAL = "external"; + private static final String VERSION_08 = "0-8"; + private static final String VERSION_010 = "0-10"; protected static final String QPID_HOME = "QPID_HOME"; public static final int DEFAULT_VM_PORT = 1; public static final int DEFAULT_PORT = Integer.getInteger("test.port", ServerConfiguration.DEFAULT_PORT); - public static final int FAILING_PORT = Integer.parseInt(System.getProperty("test.port.alt")); public static final int DEFAULT_MANAGEMENT_PORT = Integer.getInteger("test.mport", ServerConfiguration.DEFAULT_JMXPORT); public static final int DEFAULT_SSL_PORT = Integer.getInteger("test.sslport", ServerConfiguration.DEFAULT_SSL_PORT); protected String _brokerLanguage = System.getProperty(BROKER_LANGUAGE, JAVA); - protected BrokerType _brokerType = BrokerType.valueOf(System.getProperty(BROKER_TYPE, "").toUpperCase()); - protected String _brokerCommand = System.getProperty(BROKER_COMMAND); + protected String _broker = System.getProperty(BROKER, VM); private String _brokerClean = System.getProperty(BROKER_CLEAN, null); private Boolean _brokerCleanBetweenTests = Boolean.getBoolean(BROKER_CLEAN_BETWEEN_TESTS); - private final AmqpProtocolVersion _brokerVersion = AmqpProtocolVersion.valueOf(System.getProperty(BROKER_VERSION, "")); + private String _brokerVersion = System.getProperty(BROKER_VERSION, VERSION_08); protected String _output = System.getProperty(TEST_OUTPUT); protected Boolean _brokerPersistent = Boolean.getBoolean(BROKER_PERSITENT); - private String _brokerProtocolExcludes = System.getProperty(BROKER_PROTOCOL_EXCLUDES); protected static String _brokerLogPrefix = System.getProperty(BROKER_LOG_PREFIX,"BROKER: "); protected static boolean _interleaveBrokerLog = Boolean.getBoolean(BROKER_LOG_INTERLEAVE); @@ -158,7 +147,7 @@ public class QpidBrokerTestCase extends QpidTestCase protected PrintStream _brokerOutputStream; - protected Map<Integer, BrokerHolder> _brokers = new HashMap<Integer, BrokerHolder>(); + protected Map<Integer, Process> _brokers = new HashMap<Integer, Process>(); protected InitialContext _initialContext; protected AMQConnectionFactory _connectionFactory; @@ -294,16 +283,6 @@ public class QpidBrokerTestCase extends QpidTestCase fail("Unable to test without config file:" + _configFile); } - String existingQpidWorkPath = System.getProperty(BROKER_EXISTING_QPID_WORK); - if(existingQpidWorkPath != null && !existingQpidWorkPath.equals("")) - { - cleanBroker(); - - File existing = new File(existingQpidWorkPath); - File qpidWork = new File(getQpidWork(_brokerType, getPort())); - FileUtils.copyRecursive(existing, qpidWork); - } - startBroker(); } @@ -404,8 +383,13 @@ public class QpidBrokerTestCase extends QpidTestCase } } + public void startBroker() throws Exception + { + startBroker(0); + } + /** - * Return the management port in use by the broker on this main port + * Return the management portin use by the broker on this main port * * @param mainPort the broker's main port. * @@ -413,7 +397,7 @@ public class QpidBrokerTestCase extends QpidTestCase */ protected int getManagementPort(int mainPort) { - return mainPort + (DEFAULT_MANAGEMENT_PORT - DEFAULT_PORT); + return mainPort + (DEFAULT_MANAGEMENT_PORT - (_broker.equals(VM) ? DEFAULT_VM_PORT : DEFAULT_PORT)); } /** @@ -428,7 +412,11 @@ public class QpidBrokerTestCase extends QpidTestCase protected int getPort(int port) { - if (!_brokerType.equals(BrokerType.EXTERNAL)) + if (_broker.equals(VM)) + { + return port == 0 ? DEFAULT_VM_PORT : port; + } + else if (!_broker.equals(EXTERNAL)) { return port == 0 ? DEFAULT_PORT : port; } @@ -440,18 +428,11 @@ public class QpidBrokerTestCase extends QpidTestCase protected String getBrokerCommand(int port) throws MalformedURLException { - final String protocolExcludesList = _brokerProtocolExcludes.replace("@PORT", "" + port); - return _brokerCommand + return _broker .replace("@PORT", "" + port) .replace("@SSL_PORT", "" + (port - 1)) .replace("@MPORT", "" + getManagementPort(port)) - .replace("@CONFIG_FILE", _configFile.toString()) - .replace("@EXCLUDES", protocolExcludesList); - } - - public void startBroker() throws Exception - { - startBroker(0); + .replace("@CONFIG_FILE", _configFile.toString()); } public void startBroker(int port) throws Exception @@ -462,38 +443,38 @@ public class QpidBrokerTestCase extends QpidTestCase saveTestConfiguration(); saveTestVirtualhosts(); - if(_brokers.get(port) != null) - { - throw new IllegalStateException("There is already an existing broker running on port " + port); - } - - if (_brokerType.equals(BrokerType.INTERNAL) && !existingInternalBroker()) + Process process = null; + if (_broker.equals(VM)) { + setConfigurationProperty("management.jmxport", String.valueOf(getManagementPort(port))); setConfigurationProperty(ServerConfiguration.MGMT_CUSTOM_REGISTRY_SOCKET, String.valueOf(false)); saveTestConfiguration(); - - BrokerOptions options = new BrokerOptions(); - options.setConfigFile(_configFile.getAbsolutePath()); - options.addPort(port); - - addExcludedPorts(port, options); - - options.setJmxPort(getManagementPort(port)); - - //Set the log config file, relying on the log4j.configuration system property - //set on the JVM by the JUnit runner task in module.xml. - options.setLogConfigFile(new URL(System.getProperty("log4j.configuration")).getFile()); - - Broker broker = new Broker(); - _logger.info("starting internal broker (same JVM)"); - broker.startup(options); - - _brokers.put(port, new InternalBrokerHolder(broker)); + + // create an in_VM broker + final ConfigurationFileApplicationRegistry registry = new ConfigurationFileApplicationRegistry(_configFile); + try + { + ApplicationRegistry.initialise(registry, port); + } + catch (Exception e) + { + _logger.error("Broker initialise failed due to:",e); + try + { + registry.close(); + } + catch (Throwable closeE) + { + closeE.printStackTrace(); + } + throw e; + } + TransportConnection.createVMBroker(port); } - else if (!_brokerType.equals(BrokerType.EXTERNAL)) + else if (!_broker.equals(EXTERNAL)) { String cmd = getBrokerCommand(port); - _logger.info("starting external broker: " + cmd); + _logger.info("starting broker: " + cmd); ProcessBuilder pb = new ProcessBuilder(cmd.split("\\s+")); pb.redirectErrorStream(true); @@ -509,7 +490,7 @@ public class QpidBrokerTestCase extends QpidTestCase // DON'T change PNAME, qpid.stop needs this value. env.put("QPID_PNAME", "-DPNAME=QPBRKR -DTNAME=\"" + _testName + "\""); // Add the port to QPID_WORK to ensure unique working dirs for multi broker tests - env.put("QPID_WORK", getQpidWork(_brokerType, port)); + env.put("QPID_WORK", System.getProperty("QPID_WORK")+ "/" + port); // Use the environment variable to set amqj.logging.level for the broker @@ -561,7 +542,8 @@ public class QpidBrokerTestCase extends QpidTestCase env.put("QPID_OPTS", QPID_OPTS); } } - Process process = pb.start();; + + process = pb.start(); Piper p = new Piper(process.getInputStream(), _brokerOutputStream, @@ -582,7 +564,6 @@ public class QpidBrokerTestCase extends QpidTestCase try { - //test that the broker is still running and hasn't exited unexpectedly int exit = process.exitValue(); _logger.info("broker aborted: " + exit); cleanBroker(); @@ -590,58 +571,11 @@ public class QpidBrokerTestCase extends QpidTestCase } catch (IllegalThreadStateException e) { - // this is expect if the broker started successfully + // this is expect if the broker started succesfully } - - _brokers.put(port, new SpawnedBrokerHolder(process)); } - } - private void addExcludedPorts(int port, BrokerOptions options) - { - final String protocolExcludesList = _brokerProtocolExcludes.replace("@PORT", "" + port); - - if (protocolExcludesList.equals("")) - { - return; - } - final String[] toks = protocolExcludesList.split("\\s"); - - if(toks.length % 2 != 0) - { - throw new IllegalArgumentException("Must be an even number of tokens in '" + protocolExcludesList + "'"); - } - for (int i = 0; i < toks.length; i=i+2) - { - String excludeArg = toks[i]; - final int excludedPort = Integer.parseInt(toks[i+1]); - options.addExcludedPort(ProtocolExclusion.lookup(excludeArg), excludedPort); - - _logger.info("Adding protocol exclusion " + excludeArg + " " + excludedPort); - } - } - - private boolean existingInternalBroker() - { - for(BrokerHolder holder : _brokers.values()) - { - if(holder instanceof InternalBrokerHolder) - { - return true; - } - } - - return false; - } - - private String getQpidWork(BrokerType broker, int port) - { - if (!broker.equals(BrokerType.EXTERNAL)) - { - return System.getProperty("QPID_WORK")+ "/" + port; - } - - return System.getProperty("QPID_WORK"); + _brokers.put(port, process); } public String getTestConfigFile() @@ -722,17 +656,20 @@ public class QpidBrokerTestCase extends QpidTestCase port = getPort(port); _logger.info("stopping broker: " + getBrokerCommand(port)); - BrokerHolder broker = _brokers.remove(port); - broker.shutdown(); + Process process = _brokers.remove(port); + if (process != null) + { + process.destroy(); + process.waitFor(); + _logger.info("broker exited: " + process.exitValue()); + } + else if (_broker.equals(VM)) + { + TransportConnection.killVMBroker(port); + ApplicationRegistry.remove(port); + } } - public boolean isBrokerPresent(int port) throws Exception - { - port = getPort(port); - - return _brokers.containsKey(port); - } - /** * Attempt to set the Java Broker to use the BDBMessageStore for persistence * Falling back to the DerbyMessageStore if @@ -874,14 +811,20 @@ public class QpidBrokerTestCase extends QpidTestCase } /** - * Set a System property for the client (and broker if using the same vm) of this test. + * Set a System (-D) property for the external Broker of this test. * * @param property The property to set * @param value the value to set it to. */ protected void setTestClientSystemProperty(String property, String value) { - setTestSystemProperty(property, value); + if (!_propertiesSetForTestOnly.containsKey(property)) + { + // Record the current value so we can revert it later. + _propertiesSetForTestOnly.put(property, System.getProperty(property)); + } + + System.setProperty(property, value); } /** @@ -889,7 +832,20 @@ public class QpidBrokerTestCase extends QpidTestCase */ protected void revertSystemProperties() { - revertTestSystemProperties(); + for (String key : _propertiesSetForTestOnly.keySet()) + { + String value = _propertiesSetForTestOnly.get(key); + if (value != null) + { + System.setProperty(key, value); + } + else + { + System.clearProperty(key); + } + } + + _propertiesSetForTestOnly.clear(); // We don't change the current VMs settings for Broker only properties // so we can just clear this map @@ -948,17 +904,17 @@ public class QpidBrokerTestCase extends QpidTestCase */ public boolean isBroker08() { - return _brokerVersion.equals(AmqpProtocolVersion.v0_8); + return _brokerVersion.equals(VERSION_08); } public boolean isBroker010() { - return _brokerVersion.equals(AmqpProtocolVersion.v0_10); + return _brokerVersion.equals(VERSION_010); } protected boolean isJavaBroker() { - return _brokerLanguage.equals("java") || _brokerType.equals("vm"); + return _brokerLanguage.equals("java") || _broker.equals("vm"); } protected boolean isCppBroker() @@ -968,14 +924,9 @@ public class QpidBrokerTestCase extends QpidTestCase protected boolean isExternalBroker() { - return !_brokerType.equals("vm"); //TODO + return !_broker.equals("vm"); } - - protected boolean isInternalBroker() - { - return _brokerType.equals(BrokerType.INTERNAL); - } - + protected boolean isBrokerStorePersistent() { return _brokerPersistent; @@ -1047,6 +998,11 @@ public class QpidBrokerTestCase extends QpidTestCase */ public AMQConnectionFactory getConnectionFactory(String factoryName) throws NamingException { + if (_broker.equals(VM)) + { + factoryName += ".vm"; + } + return (AMQConnectionFactory) getInitialContext().lookup(factoryName); } @@ -1087,7 +1043,15 @@ public class QpidBrokerTestCase extends QpidTestCase public Connection getClientConnection(String username, String password, String id) throws JMSException, URLSyntaxException, AMQException, NamingException { _logger.info("get Connection"); - Connection con = getConnectionFactory().createConnection(username, password, id); + Connection con; + if (_broker.equals(VM)) + { + con = new AMQConnection("vm://:1", username, password, id, "test"); + } + else + { + con = getConnectionFactory().createConnection(username, password, id); + } //add the connection in the lis of connections _connections.add(con); return con; @@ -1125,8 +1089,7 @@ public class QpidBrokerTestCase extends QpidTestCase c.close(); } } - finally - { + finally{ // Ensure any problems with close does not interfer with property resets revertSystemProperties(); revertLoggingLevels(); @@ -1227,8 +1190,7 @@ public class QpidBrokerTestCase extends QpidTestCase MessageProducer producer = session.createProducer(destination); - int i = offset; - for (; i < (count + offset); i++) + for (int i = offset; i < (count + offset); i++) { Message next = createNextMessage(session, i); @@ -1251,7 +1213,7 @@ public class QpidBrokerTestCase extends QpidTestCase // we have no batchSize or // our count is not divible by batchSize. if (session.getTransacted() && - ( batchSize == 0 || (i-1) % batchSize != 0)) + ( batchSize == 0 || count % batchSize != 0)) { session.commit(); } @@ -1346,26 +1308,29 @@ public class QpidBrokerTestCase extends QpidTestCase */ public void reloadBrokerSecurityConfig() throws Exception { - JMXTestUtils jmxu = new JMXTestUtils(this, "admin" , "admin"); - jmxu.open(); - - try + if (_broker.equals(VM)) { - ConfigurationManagement configMBean = jmxu.getConfigurationManagement(); - configMBean.reloadSecurityConfiguration(); + ApplicationRegistry.getInstance().getConfiguration().reparseConfigFileSecuritySections(); } - finally + else { - jmxu.close(); - } - - LogMonitor _monitor = new LogMonitor(_outputFile); - assertTrue("The expected server security configuration reload did not occur", - _monitor.waitForMessage(ServerConfiguration.SECURITY_CONFIG_RELOADED, LOGMONITOR_TIMEOUT)); - } + JMXTestUtils jmxu = new JMXTestUtils(this, "admin" , "admin"); + jmxu.open(); + + try + { + ConfigurationManagement configMBean = jmxu.getConfigurationManagement(); + configMBean.reloadSecurityConfiguration(); + } + finally + { + jmxu.close(); + } + + LogMonitor _monitor = new LogMonitor(_outputFile); + assertTrue("The expected server security configuration reload did not occur", + _monitor.waitForMessage(ServerConfiguration.SECURITY_CONFIG_RELOADED, LOGMONITOR_TIMEOUT)); - protected int getFailingPort() - { - return FAILING_PORT; + } } } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/SpawnedBrokerHolder.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/SpawnedBrokerHolder.java deleted file mode 100644 index 65239bbe02..0000000000 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/SpawnedBrokerHolder.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.test.utils; - -import org.apache.log4j.Logger; - -public class SpawnedBrokerHolder implements BrokerHolder -{ - private static final Logger LOGGER = Logger.getLogger(SpawnedBrokerHolder.class); - - private final Process _process; - - public SpawnedBrokerHolder(final Process process) - { - if(process == null) - { - throw new IllegalArgumentException("Process must not be null"); - } - - _process = process; - } - - public void shutdown() - { - LOGGER.info("Destroying broker process"); - - _process.destroy(); - - try - { - _process.waitFor(); - LOGGER.info("broker exited: " + _process.exitValue()); - } - catch (InterruptedException e) - { - LOGGER.error("Interrupted whilst waiting for process destruction"); - Thread.currentThread().interrupt(); - } - } -} |