diff options
author | Kim van der Riet <kpvdr@apache.org> | 2012-08-03 12:13:32 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2012-08-03 12:13:32 +0000 |
commit | d43d1912b376322e27fdcda551a73f9ff5487972 (patch) | |
tree | ce493e10baa95f44be8beb5778ce51783463196d /java/systests/src/main | |
parent | 04877fec0c6346edec67072d7f2d247740cf2af5 (diff) | |
download | qpid-python-d43d1912b376322e27fdcda551a73f9ff5487972.tar.gz |
QPID-3858: Updated branch - merged from trunk r.1368650
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1368910 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/systests/src/main')
38 files changed, 1897 insertions, 2746 deletions
diff --git a/java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java b/java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java index 526db29181..4b766864b4 100644 --- a/java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java +++ b/java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java @@ -19,7 +19,13 @@ package org.apache.qpid.client.failover; import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQConnectionFactory; +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.jms.ConnectionListener; +import org.apache.qpid.jms.ConnectionURL; import org.apache.qpid.jms.FailoverPolicy; import org.apache.qpid.test.utils.FailoverBaseCase; @@ -36,10 +42,14 @@ import javax.jms.QueueBrowser; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.TransactionRolledBackException; +import javax.naming.NamingException; + import java.text.MessageFormat; import java.util.ArrayList; import java.util.Enumeration; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -760,6 +770,181 @@ public class FailoverBehaviourTest extends FailoverBaseCase implements Connectio //got started, before allowing the test to tear down awaitForFailoverCompletion(DEFAULT_FAILOVER_TIME); } + + /** + * This test only tests 0-8/0-9/0-9-1 failover timeout + */ + public void testFailoverHandlerTimeoutExpires() throws Exception + { + _connection.close(); + setTestSystemProperty("qpid.failover_method_timeout", "10000"); + AMQConnection connection = null; + try + { + connection = createConnectionWithFailover(); + + // holding failover mutex should prevent the failover from proceeding + synchronized(connection.getFailoverMutex()) + { + killBroker(); + startBroker(); + + // sleep interval exceeds failover timeout interval + Thread.sleep(11000l); + } + + // allows the failover thread to proceed + Thread.yield(); + assertFalse("Unexpected failover", _failoverComplete.await(2000l, TimeUnit.MILLISECONDS)); + assertTrue("Failover should not succeed due to timeout", connection.isClosed()); + } + finally + { + if (connection != null) + { + connection.close(); + } + } + } + + public void testFailoverHandlerTimeoutReconnected() throws Exception + { + _connection.close(); + setTestSystemProperty("qpid.failover_method_timeout", "10000"); + AMQConnection connection = null; + try + { + connection = createConnectionWithFailover(); + + // holding failover mutex should prevent the failover from proceeding + synchronized(connection.getFailoverMutex()) + { + killBroker(); + startBroker(); + } + + // allows the failover thread to proceed + Thread.yield(); + awaitForFailoverCompletion(DEFAULT_FAILOVER_TIME); + assertFalse("Failover should restore connectivity", connection.isClosed()); + } + finally + { + if (connection != null) + { + connection.close(); + } + } + } + + /** + * Tests that the producer flow control flag is reset when failover occurs while + * the producers are being blocked by the broker. + * + * Uses Java broker specific queue configuration to enabled PSFC. + */ + public void testFlowControlFlagResetOnFailover() throws Exception + { + // we do not need the connection failing to second broker + _connection.close(); + + // make sure that failover timeout is bigger than flow control timeout + setTestSystemProperty("qpid.failover_method_timeout", "60000"); + setTestSystemProperty("qpid.flow_control_wait_failure", "10000"); + + AMQConnection connection = null; + try + { + connection = createConnectionWithFailover(); + + final Session producerSession = connection.createSession(true, Session.SESSION_TRANSACTED); + final Queue queue = createAndBindQueueWithFlowControlEnabled(producerSession, getTestQueueName(), DEFAULT_MESSAGE_SIZE * 3, DEFAULT_MESSAGE_SIZE * 2); + final AtomicInteger counter = new AtomicInteger(); + // try to send 5 messages (should block after 4) + new Thread(new Runnable() + { + @Override + public void run() + { + try + { + MessageProducer producer = producerSession.createProducer(queue); + for (int i=0; i < 5; i++) + { + Message next = createNextMessage(producerSession, i); + producer.send(next); + producerSession.commit(); + counter.incrementAndGet(); + } + } + catch(Exception e) + { + // ignore + } + } + }).start(); + + long limit= 30000l; + long start = System.currentTimeMillis(); + + // wait until session is blocked + while(!((AMQSession<?,?>)producerSession).isFlowBlocked() && System.currentTimeMillis() - start < limit) + { + Thread.sleep(100l); + } + + assertTrue("Flow is not blocked", ((AMQSession<?, ?>) producerSession).isFlowBlocked()); + // Message counter could be 3 or 4 depending on the progression of producing thread relative + // to the receipt of the ChannelFlow. + final int currentCounter = counter.get(); + assertTrue("Unexpected number of sent messages", currentCounter == 3 || currentCounter == 4); + + killBroker(); + startBroker(); + + // allows the failover thread to proceed + Thread.yield(); + awaitForFailoverCompletion(60000l); + + assertFalse("Flow is blocked", ((AMQSession<?, ?>) producerSession).isFlowBlocked()); + } + finally + { + if (connection != null) + { + connection.close(); + } + } + } + + private Queue createAndBindQueueWithFlowControlEnabled(Session session, String queueName, int capacity, int resumeCapacity) throws Exception + { + final Map<String, Object> arguments = new HashMap<String, Object>(); + arguments.put("x-qpid-capacity", capacity); + arguments.put("x-qpid-flow-resume-capacity", resumeCapacity); + ((AMQSession<?, ?>) session).createQueue(new AMQShortString(queueName), true, true, false, arguments); + Queue queue = session.createQueue("direct://amq.direct/" + queueName + "/" + queueName + "?durable='" + true + + "'&autodelete='" + true + "'"); + ((AMQSession<?, ?>) session).declareAndBind((AMQDestination) queue); + return queue; + } + + private AMQConnection createConnectionWithFailover() throws NamingException, JMSException + { + AMQConnection connection; + AMQConnectionFactory connectionFactory = (AMQConnectionFactory)getConnectionFactory("default"); + ConnectionURL connectionURL = connectionFactory.getConnectionURL(); + connectionURL.setOption(ConnectionURL.OPTIONS_FAILOVER, "singlebroker"); + connectionURL.setOption(ConnectionURL.OPTIONS_FAILOVER_CYCLE, "2"); + BrokerDetails details = connectionURL.getBrokerDetails(0); + details.setProperty(BrokerDetails.OPTIONS_RETRY, "200"); + details.setProperty(BrokerDetails.OPTIONS_CONNECT_DELAY, "1000"); + + connection = (AMQConnection)connectionFactory.createConnection("admin", "admin"); + connection.setConnectionListener(this); + return connection; + } + /** * Tests {@link Session#close()} for session with given acknowledge mode * to ensure that close works after failover. diff --git a/java/systests/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessageTest.java b/java/systests/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessageTest.java index c4b1b08eea..787e727e66 100644 --- a/java/systests/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessageTest.java +++ b/java/systests/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessageTest.java @@ -1,4 +1,3 @@ -package org.apache.qpid.client.message; /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -19,6 +18,7 @@ package org.apache.qpid.client.message; * under the License. * */ +package org.apache.qpid.client.message; import org.apache.qpid.test.utils.QpidBrokerTestCase; diff --git a/java/systests/src/main/java/org/apache/qpid/client/ssl/SSLTest.java b/java/systests/src/main/java/org/apache/qpid/client/ssl/SSLTest.java index 1cd088b736..39689f5096 100644 --- a/java/systests/src/main/java/org/apache/qpid/client/ssl/SSLTest.java +++ b/java/systests/src/main/java/org/apache/qpid/client/ssl/SSLTest.java @@ -46,6 +46,7 @@ public class SSLTest extends QpidBrokerTestCase setTestClientSystemProperty("profile.use_ssl", "true"); setConfigurationProperty("connector.ssl.enabled", "true"); setConfigurationProperty("connector.ssl.sslOnly", "true"); + setConfigurationProperty("connector.ssl.wantClientAuth", "true"); } // set the ssl system properties diff --git a/java/systests/src/main/java/org/apache/qpid/jms/xa/XAResourceTest.java b/java/systests/src/main/java/org/apache/qpid/jms/xa/XAResourceTest.java index ac29b72620..e18f70b01d 100644 --- a/java/systests/src/main/java/org/apache/qpid/jms/xa/XAResourceTest.java +++ b/java/systests/src/main/java/org/apache/qpid/jms/xa/XAResourceTest.java @@ -24,21 +24,72 @@ import org.apache.qpid.client.AMQConnectionFactory; import org.apache.qpid.jms.ConnectionURL; import org.apache.qpid.test.utils.QpidBrokerTestCase; import org.apache.qpid.util.FileUtils; +import org.apache.qpid.test.unit.xa.AbstractXATestCase; +import org.apache.qpid.client.AMQXAResource; + +import org.apache.qpid.dtx.XidImpl; import javax.jms.XAConnection; import javax.jms.XAConnectionFactory; import javax.jms.XASession; +import javax.transaction.xa.XAException; import javax.transaction.xa.XAResource; +import javax.transaction.xa.Xid; -public class XAResourceTest extends QpidBrokerTestCase +public class XAResourceTest extends AbstractXATestCase { private static final String FACTORY_NAME = "default"; private static final String ALT_FACTORY_NAME = "connection2"; + public void init() throws Exception + { + } + + public void testIsSameRMJoin() throws Exception + { + XAConnectionFactory factory = getConnectionFactory(FACTORY_NAME); + XAConnection conn1 = factory.createXAConnection("guest", "guest"); + XAConnection conn2 = factory.createXAConnection("guest", "guest"); + XAConnection conn3 = factory.createXAConnection("guest", "guest"); + + XASession session1 = conn1.createXASession(); + XASession session2 = conn2.createXASession(); + XASession session3 = conn3.createXASession(); + + AMQXAResource xaResource1 = (AMQXAResource)session1.getXAResource(); + AMQXAResource xaResource2 = (AMQXAResource)session2.getXAResource(); + AMQXAResource xaResource3 = (AMQXAResource)session3.getXAResource(); + + Xid xid = getNewXid(); + + xaResource1.start(xid, XAResource.TMNOFLAGS); + assertTrue("XAResource isSameRM", xaResource1.isSameRM(xaResource2)); + xaResource2.start(xid, XAResource.TMJOIN); + assertTrue("AMQXAResource siblings should be 1", xaResource1.getSiblings().size() == 1); + + assertTrue("AMQXAResource TMJOIN resource siblings should be 0", xaResource2.getSiblings().size() == 0); + + assertTrue("XAResource isSameRM", xaResource2.isSameRM(xaResource3)); + + + xaResource3.start(xid, XAResource.TMJOIN); + assertTrue("AMQXAResource siblings should be 1", xaResource2.getSiblings().size() == 1); + + xaResource1.end(xid, XAResource.TMSUCCESS); + assertTrue("AMQXAResource TMJOIN resource siblings should be 0", xaResource1.getSiblings().size() == 0); + + xaResource1.prepare(xid); + xaResource1.commit(xid, false); + + conn3.close(); + conn2.close(); + conn1.close(); + } + /* * Test with multiple XAResources originating from the same connection factory. XAResource(s) will be equal, - * as they originate from the same session. + * as they originate from the same session. */ public void testIsSameRMSingleCF() throws Exception { @@ -47,14 +98,14 @@ public class XAResourceTest extends QpidBrokerTestCase XASession session = conn.createXASession(); XAResource xaResource1 = session.getXAResource(); XAResource xaResource2 = session.getXAResource(); - + assertEquals("XAResource objects not equal", xaResource1, xaResource2); assertTrue("isSameRM not true for identical objects", xaResource1.isSameRM(xaResource2)); - + session.close(); conn.close(); } - + /* * Test with multiple XAResources originating from different connection factory's and different sessions. XAResources will not be * equal as they do not originate from the same session. As the UUID from the broker will be the same, isSameRM will be true. @@ -67,11 +118,11 @@ public class XAResourceTest extends QpidBrokerTestCase XAConnectionFactory factory = new AMQConnectionFactory(url); XAConnectionFactory factory2 = new AMQConnectionFactory(url); XAConnectionFactory factory3 = getConnectionFactory(ALT_FACTORY_NAME); - + XAConnection conn = factory.createXAConnection("guest","guest"); XAConnection conn2 = factory2.createXAConnection("guest","guest"); XAConnection conn3 = factory3.createXAConnection("guest","guest"); - + XASession session = conn.createXASession(); XASession session2 = conn2.createXASession(); XASession session3 = conn3.createXASession(); @@ -79,14 +130,14 @@ public class XAResourceTest extends QpidBrokerTestCase XAResource xaResource1 = session.getXAResource(); XAResource xaResource2 = session2.getXAResource(); XAResource xaResource3 = session3.getXAResource(); - + assertFalse("XAResource objects should not be equal", xaResource1.equals(xaResource2)); assertTrue("isSameRM not true for identical objects", xaResource1.isSameRM(xaResource2)); assertFalse("isSameRM true for XA Resources created by two different brokers", xaResource1.isSameRM(xaResource3)); - + conn.close(); conn2.close(); - conn3.close(); + conn3.close(); } @Override @@ -103,5 +154,5 @@ public class XAResourceTest extends QpidBrokerTestCase FileUtils.deleteDirectory(System.getProperty("QPID_WORK") + "/" + getFailingPort()); } } - + } diff --git a/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagedBrokerMBeanTest.java b/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagedBrokerMBeanTest.java deleted file mode 100644 index ba0f955d76..0000000000 --- a/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagedBrokerMBeanTest.java +++ /dev/null @@ -1,164 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpid.management.jmx; - -import org.apache.qpid.exchange.ExchangeDefaults; -import org.apache.qpid.management.common.mbeans.ManagedBroker; -import org.apache.qpid.management.common.mbeans.ManagedExchange; -import org.apache.qpid.management.common.mbeans.ManagedQueue; -import org.apache.qpid.server.queue.AMQQueueFactory; -import org.apache.qpid.test.utils.JMXTestUtils; -import org.apache.qpid.test.utils.QpidBrokerTestCase; - -import javax.management.MBeanException; -import javax.management.ObjectName; -import java.util.Collections; -import java.util.Map; - -/** - * Tests the JMX API for the Managed Broker. - * - */ -public class ManagedBrokerMBeanTest extends QpidBrokerTestCase -{ - /** - * Test virtual host - */ - private static final String VIRTUAL_HOST = "test"; - - /** - * Test exchange type - */ - private static final String EXCHANGE_TYPE = "topic"; - - /** - * JMX helper. - */ - private JMXTestUtils _jmxUtils; - private ManagedBroker _managedBroker; - - public void setUp() throws Exception - { - _jmxUtils = new JMXTestUtils(this); - _jmxUtils.setUp(); - super.setUp(); - _jmxUtils.open(); - _managedBroker = _jmxUtils.getManagedBroker(VIRTUAL_HOST); - } - - public void tearDown() throws Exception - { - if (_jmxUtils != null) - { - _jmxUtils.close(); - } - super.tearDown(); - } - - /** - * Tests queue creation/deletion also verifying the automatic binding to the default exchange. - */ - public void testCreateQueueAndDeletion() throws Exception - { - final String queueName = getTestQueueName(); - final ManagedExchange defaultExchange = _jmxUtils.getManagedExchange(ExchangeDefaults.DEFAULT_EXCHANGE_NAME.asString()); - - // Check that bind does not exist before queue creation - assertFalse("Binding to " + queueName + " should not exist in default exchange before queue creation", - defaultExchange.bindings().containsKey(new String[] {queueName})); - - _managedBroker.createNewQueue(queueName, "testowner", true); - - // Ensure the queue exists - assertNotNull("Queue object name expected to exist", _jmxUtils.getQueueObjectName("test", queueName)); - assertNotNull("Manager queue expected to be available", _jmxUtils.getManagedQueue(queueName)); - - // Now verify that the default exchange has been bound. - assertTrue("Binding to " + queueName + " should exist in default exchange after queue creation", - defaultExchange.bindings().containsKey(new String[] {queueName})); - - // Now delete the queue - _managedBroker.deleteQueue(queueName); - - // Finally ensure that the binding has been removed. - assertFalse("Binding to " + queueName + " should not exist in default exchange after queue deletion", - defaultExchange.bindings().containsKey(new String[] {queueName})); - } - - /** - * Tests exchange creation/deletion via JMX API. - */ - public void testCreateExchangeAndUnregister() throws Exception - { - String exchangeName = getTestName(); - _managedBroker.createNewExchange(exchangeName, "topic", true); - String queryString = "org.apache.qpid:type=VirtualHost.Exchange,VirtualHost=" - + ObjectName.quote(VIRTUAL_HOST) + ",name=" + ObjectName.quote(exchangeName) + ",ExchangeType=" - + EXCHANGE_TYPE; - ManagedExchange exchange = _jmxUtils.getManagedObject(ManagedExchange.class, queryString); - assertNotNull("Exchange should exist", exchange); - - _managedBroker.unregisterExchange(exchangeName); - assertFalse("Exchange should have been removed", _jmxUtils.isManagedObjectExist(queryString)); - } - - /** - * Tests that it is disallowed to unregister the default exchange. - */ - public void testUnregisterOfDefaultExchangeDisallowed() throws Exception - { - String defaultExchangeName = ExchangeDefaults.DEFAULT_EXCHANGE_NAME.asString(); - - try - { - _managedBroker.unregisterExchange(defaultExchangeName); - fail("Exception not thrown"); - } - catch (MBeanException mbe) - { - // PASS - assertEquals("Error in unregistering exchange " + defaultExchangeName, mbe.getMessage()); - assertTrue(mbe.getCause().getMessage().contains("Cannot unregister the default exchange")); - } - final ManagedExchange defaultExchange = _jmxUtils.getManagedExchange(defaultExchangeName); - assertNotNull("Exchange should exist", defaultExchange); - } - - /** - * Tests queue creation with {@link AMQQueueFactory#X_QPID_MAXIMUM_DELIVERY_COUNT} argument. Also tests - * that the attribute is exposed correctly through {@link ManagedQueue#getMaximumDeliveryCount()}. - */ - public void testCreateQueueWithMaximumDeliveryCountSet() throws Exception - { - final String queueName = getName(); - final ManagedBroker managedBroker = _jmxUtils.getManagedBroker(VIRTUAL_HOST); - - final Integer deliveryCount = 1; - final Map<String, Object> args = Collections.singletonMap(AMQQueueFactory.X_QPID_MAXIMUM_DELIVERY_COUNT, (Object)deliveryCount); - managedBroker.createNewQueue(queueName, "testowner", true, args); - - // Ensure the queue exists - assertNotNull("Queue object name expected to exist", _jmxUtils.getQueueObjectName("test", queueName)); - assertNotNull("Manager queue expected to be available", _jmxUtils.getManagedQueue(queueName)); - - final ManagedQueue managedQueue = _jmxUtils.getManagedQueue(queueName); - assertEquals("Unexpected maximum delivery count", deliveryCount, managedQueue.getMaximumDeliveryCount()); - } - -} diff --git a/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagedConnectionMBeanTest.java b/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagedConnectionMBeanTest.java deleted file mode 100644 index 3fc370dc68..0000000000 --- a/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagedConnectionMBeanTest.java +++ /dev/null @@ -1,278 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpid.management.jmx; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.qpid.client.AMQSession; -import org.apache.qpid.common.QpidProperties; -import org.apache.qpid.management.common.mbeans.ManagedConnection; -import org.apache.qpid.test.utils.JMXTestUtils; -import org.apache.qpid.test.utils.QpidBrokerTestCase; - -import javax.jms.Connection; -import javax.jms.Destination; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.TextMessage; -import javax.management.JMException; -import javax.management.openmbean.CompositeDataSupport; -import javax.management.openmbean.TabularData; -import java.io.IOException; -import java.util.Date; -import java.util.Iterator; -import java.util.List; - -public class ManagedConnectionMBeanTest extends QpidBrokerTestCase -{ - private static final Logger LOGGER = LoggerFactory.getLogger(ManagedConnectionMBeanTest.class); - - /** - * JMX helper. - */ - private JMXTestUtils _jmxUtils; - private Connection _connection; - - public void setUp() throws Exception - { - _jmxUtils = new JMXTestUtils(this); - _jmxUtils.setUp(); - super.setUp(); - _jmxUtils.open(); - _connection = getConnection(); - } - - public void tearDown() throws Exception - { - if (_jmxUtils != null) - { - _jmxUtils.close(); - } - super.tearDown(); - } - - public void testChannels() throws Exception - { - final String queueName = getTestQueueName(); - - final Session session = _connection.createSession(true, Session.SESSION_TRANSACTED); - final Destination destination = session.createQueue(queueName); - final MessageConsumer consumer = session.createConsumer(destination); - - final int numberOfMessages = 2; - sendMessage(session, destination, numberOfMessages); - _connection.start(); - - for (int i = 0; i < numberOfMessages; i++) - { - final Message m = consumer.receive(1000l); - assertNotNull("Message " + i + " is not received", m); - } - - List<ManagedConnection> connections = _jmxUtils.getManagedConnections("test"); - assertNotNull("Connection MBean is not found", connections); - assertEquals("Unexpected number of connection mbeans", 1, connections.size()); - final ManagedConnection mBean = connections.get(0); - assertNotNull("Connection MBean is null", mBean); - - TabularData channelsData = mBean.channels(); - assertNotNull("Channels data are null", channelsData); - assertEquals("Unexpected number of rows in channel table", 1, channelsData.size()); - - final Iterator<CompositeDataSupport> rowItr = (Iterator<CompositeDataSupport>) channelsData.values().iterator(); - final CompositeDataSupport row = rowItr.next(); - Number unackCount = (Number) row.get(ManagedConnection.UNACKED_COUNT); - final Boolean transactional = (Boolean) row.get(ManagedConnection.TRANSACTIONAL); - final Boolean flowBlocked = (Boolean) row.get(ManagedConnection.FLOW_BLOCKED); - assertNotNull("Channel should have unacknowledged messages", unackCount); - assertEquals("Unexpected number of unacknowledged messages", 2, unackCount.intValue()); - assertNotNull("Channel should have transaction flag", transactional); - assertTrue("Unexpected transaction flag", transactional); - assertNotNull("Channel should have flow blocked flag", flowBlocked); - assertFalse("Unexpected value of flow blocked flag", flowBlocked); - - final Date initialLastIOTime = mBean.getLastIoTime(); - session.commit(); - assertTrue("Last IO time should have been updated", mBean.getLastIoTime().after(initialLastIOTime)); - - channelsData = mBean.channels(); - assertNotNull("Channels data are null", channelsData); - assertEquals("Unexpected number of rows in channel table", 1, channelsData.size()); - - final Iterator<CompositeDataSupport> rowItr2 = (Iterator<CompositeDataSupport>) channelsData.values().iterator(); - final CompositeDataSupport row2 = rowItr2.next(); - unackCount = (Number) row2.get(ManagedConnection.UNACKED_COUNT); - assertNotNull("Channel should have unacknowledged messages", unackCount); - assertEquals("Unexpected number of anacknowledged messages", 0, unackCount.intValue()); - - _connection.close(); - - LOGGER.debug("Querying JMX for number of open connections"); - connections = _jmxUtils.getManagedConnections("test"); - assertNotNull("Connection MBean is not found", connections); - assertEquals("Unexpected number of connection mbeans after connection closed", 0, connections.size()); - } - - public void testCommit() throws Exception - { - final String queueName = getTestQueueName(); - - final Session consumerSession = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - final Session producerSession = _connection.createSession(true, Session.SESSION_TRANSACTED); - final Destination destination = producerSession.createQueue(queueName); - final MessageConsumer consumer = consumerSession.createConsumer(destination); - final MessageProducer producer = producerSession.createProducer(destination); - - _connection.start(); - - List<ManagedConnection> connections = _jmxUtils.getManagedConnections("test"); - assertNotNull("Connection MBean is not found", connections); - assertEquals("Unexpected number of connection mbeans", 1, connections.size()); - final ManagedConnection mBean = connections.get(0); - assertNotNull("Connection MBean is null", mBean); - - final int numberOfMessages = 2; - for (int i = 0; i < numberOfMessages; i++) - { - producer.send(producerSession.createTextMessage("Test " + i)); - } - - // sync to make sure that messages are received on the broker - // before we commit via JMX - ((AMQSession<?, ?>) producerSession).sync(); - - Message m = consumer.receive(500l); - assertNull("Unexpected message received", m); - - Number channelId = getFirstTransactedChannelId(mBean, 2); - mBean.commitTransactions(channelId.intValue()); - - for (int i = 0; i < numberOfMessages; i++) - { - m = consumer.receive(1000l); - assertNotNull("Message " + i + " is not received", m); - assertEquals("Unexpected message received at " + i, "Test " + i, ((TextMessage) m).getText()); - } - producerSession.commit(); - m = consumer.receive(500l); - assertNull("Unexpected message received", m); - } - - protected Number getFirstTransactedChannelId(final ManagedConnection mBean, int channelNumber) throws IOException, JMException - { - TabularData channelsData = mBean.channels(); - assertNotNull("Channels data are null", channelsData); - assertEquals("Unexpected number of rows in channel table", channelNumber, channelsData.size()); - final Iterator<CompositeDataSupport> rowItr = (Iterator<CompositeDataSupport>) channelsData.values().iterator(); - while (rowItr.hasNext()) - { - final CompositeDataSupport row = rowItr.next(); - Boolean transacted = (Boolean) row.get(ManagedConnection.TRANSACTIONAL); - if (transacted.booleanValue()) - { - return (Number) row.get(ManagedConnection.CHAN_ID); - } - } - return null; - } - - public void testRollback() throws Exception - { - final String queueName = getTestQueueName(); - - final Session consumerSession = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - final Session producerSession = _connection.createSession(true, Session.SESSION_TRANSACTED); - final Destination destination = producerSession.createQueue(queueName); - final MessageConsumer consumer = consumerSession.createConsumer(destination); - final MessageProducer producer = producerSession.createProducer(destination); - - List<ManagedConnection> connections = _jmxUtils.getManagedConnections("test"); - assertNotNull("Connection MBean is not found", connections); - assertEquals("Unexpected number of connection mbeans", 1, connections.size()); - final ManagedConnection mBean = connections.get(0); - assertNotNull("Connection MBean is null", mBean); - - final int numberOfMessages = 2; - for (int i = 0; i < numberOfMessages; i++) - { - producer.send(producerSession.createTextMessage("Test " + i)); - } - - // sync to make sure that messages are received on the broker - // before we rollback via JMX - ((AMQSession<?, ?>) producerSession).sync(); - - Number channelId = getFirstTransactedChannelId(mBean, 2); - mBean.rollbackTransactions(channelId.intValue()); - - Message m = consumer.receive(1000l); - assertNull("Unexpected message received: " + String.valueOf(m), m); - - producerSession.commit(); - - _connection.start(); - m = consumer.receive(1000l); - assertNull("Unexpected message received after commit " + String.valueOf(m), m); - } - - public void testAuthorisedId() throws Exception - { - List<ManagedConnection> connections = _jmxUtils.getManagedConnections("test"); - assertNotNull("Connection MBean is not found", connections); - assertEquals("Unexpected number of connection mbeans", 1, connections.size()); - final ManagedConnection mBean = connections.get(0); - assertNotNull("Connection MBean is null", mBean); - assertEquals("Unexpected authorized id", "guest", mBean.getAuthorizedId()); - } - - public void testClientVersion() throws Exception - { - List<ManagedConnection> connections = _jmxUtils.getManagedConnections("test"); - assertNotNull("Connection MBean is not found", connections); - assertEquals("Unexpected number of connection mbeans", 1, connections.size()); - final ManagedConnection mBean = connections.get(0); - assertNotNull("Connection MBean is null", mBean); - - String expectedVersion = QpidProperties.getReleaseVersion(); - assertNotNull("version should not be null", expectedVersion); - assertFalse("version should not be the empty string", expectedVersion.equals("")); - assertFalse("version should not be the string 'null'", expectedVersion.equals("null")); - - assertEquals("Unexpected version", expectedVersion, mBean.getVersion()); - } - - public void testClientId() throws Exception - { - List<ManagedConnection> connections = _jmxUtils.getManagedConnections("test"); - assertNotNull("Connection MBean is not found", connections); - assertEquals("Unexpected number of connection mbeans", 1, connections.size()); - final ManagedConnection mBean = connections.get(0); - assertNotNull("Connection MBean is null", mBean); - - String expectedClientId = _connection.getClientID(); - assertNotNull("ClientId should not be null", expectedClientId); - assertFalse("ClientId should not be the empty string", expectedClientId.equals("")); - assertFalse("ClientId should not be the string 'null'", expectedClientId.equals("null")); - - assertEquals("Unexpected ClientId", expectedClientId, mBean.getClientId()); - } -} diff --git a/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagedQueueMBeanTest.java b/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagedQueueMBeanTest.java deleted file mode 100644 index 244e547e02..0000000000 --- a/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagedQueueMBeanTest.java +++ /dev/null @@ -1,345 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpid.management.jmx; - -import org.apache.commons.lang.time.FastDateFormat; - -import org.apache.log4j.Logger; -import org.apache.qpid.client.AMQSession; -import org.apache.qpid.configuration.ClientProperties; -import org.apache.qpid.management.common.mbeans.ManagedQueue; -import org.apache.qpid.server.queue.AMQQueueMBean; -import org.apache.qpid.test.utils.JMXTestUtils; -import org.apache.qpid.test.utils.QpidBrokerTestCase; - -import javax.jms.Connection; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.Session; -import javax.management.openmbean.CompositeData; -import javax.management.openmbean.TabularData; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.SortedSet; -import java.util.TreeSet; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -/** - * Tests the JMX API for the Managed Queue. - * - */ -public class ManagedQueueMBeanTest extends QpidBrokerTestCase -{ - protected static final Logger LOGGER = Logger.getLogger(ManagedQueueMBeanTest.class); - - private JMXTestUtils _jmxUtils; - private Connection _connection; - private Session _session; - - private String _sourceQueueName; - private String _destinationQueueName; - private Destination _sourceQueue; - private Destination _destinationQueue; - private ManagedQueue _managedSourceQueue; - private ManagedQueue _managedDestinationQueue; - - public void setUp() throws Exception - { - _jmxUtils = new JMXTestUtils(this); - _jmxUtils.setUp(); - - super.setUp(); - _sourceQueueName = getTestQueueName() + "_src"; - _destinationQueueName = getTestQueueName() + "_dest"; - - _connection = getConnection(); - _connection.start(); - - _session = _connection.createSession(true, Session.SESSION_TRANSACTED); - _sourceQueue = _session.createQueue(_sourceQueueName); - _destinationQueue = _session.createQueue(_destinationQueueName); - createQueueOnBroker(_sourceQueue); - createQueueOnBroker(_destinationQueue); - - _jmxUtils.open(); - - _managedSourceQueue = _jmxUtils.getManagedQueue(_sourceQueueName); - _managedDestinationQueue = _jmxUtils.getManagedQueue(_destinationQueueName); - } - - public void tearDown() throws Exception - { - if (_jmxUtils != null) - { - _jmxUtils.close(); - } - super.tearDown(); - } - - /** - * Tests {@link ManagedQueue#viewMessages(long, long)} interface. - */ - public void testViewSingleMessage() throws Exception - { - final List<Message> sentMessages = sendMessage(_session, _sourceQueue, 1); - syncSession(_session); - final Message sentMessage = sentMessages.get(0); - - assertEquals("Unexpected queue depth", 1, _managedSourceQueue.getMessageCount().intValue()); - - // Check the contents of the message - final TabularData tab = _managedSourceQueue.viewMessages(1l, 1l); - assertEquals("Unexpected number of rows in table", 1, tab.size()); - final Iterator<CompositeData> rowItr = (Iterator<CompositeData>) tab.values().iterator(); - - final CompositeData row1 = rowItr.next(); - assertNotNull("Message should have AMQ message id", row1.get(ManagedQueue.MSG_AMQ_ID)); - assertEquals("Unexpected queue position", 1l, row1.get(ManagedQueue.MSG_QUEUE_POS)); - assertEquals("Unexpected redelivered flag", Boolean.FALSE, row1.get(ManagedQueue.MSG_REDELIVERED)); - - // Check the contents of header (encoded in a string array) - final String[] headerArray = (String[]) row1.get(ManagedQueue.MSG_HEADER); - assertNotNull("Expected message header array", headerArray); - final Map<String, String> headers = headerArrayToMap(headerArray); - - final String expectedJMSMessageID = isBroker010() ? sentMessage.getJMSMessageID().replace("ID:", "") : sentMessage.getJMSMessageID(); - final String expectedFormattedJMSTimestamp = FastDateFormat.getInstance(AMQQueueMBean.JMSTIMESTAMP_DATETIME_FORMAT).format(sentMessage.getJMSTimestamp()); - assertEquals("Unexpected JMSMessageID within header", expectedJMSMessageID, headers.get("JMSMessageID")); - assertEquals("Unexpected JMSPriority within header", String.valueOf(sentMessage.getJMSPriority()), headers.get("JMSPriority")); - assertEquals("Unexpected JMSTimestamp within header", expectedFormattedJMSTimestamp, headers.get("JMSTimestamp")); - } - - /** - * Tests {@link ManagedQueue#moveMessages(long, long, String)} interface. - */ - public void testMoveMessagesBetweenQueues() throws Exception - { - final int numberOfMessagesToSend = 10; - - sendMessage(_session, _sourceQueue, numberOfMessagesToSend); - syncSession(_session); - assertEquals("Unexpected queue depth after send", numberOfMessagesToSend, _managedSourceQueue.getMessageCount().intValue()); - - List<Long> amqMessagesIds = getAMQMessageIdsOn(_managedSourceQueue, 1, numberOfMessagesToSend); - - // Move first three messages to destination - long fromMessageId = amqMessagesIds.get(0); - long toMessageId = amqMessagesIds.get(2); - _managedSourceQueue.moveMessages(fromMessageId, toMessageId, _destinationQueueName); - - assertEquals("Unexpected queue depth on destination queue after first move", 3, _managedDestinationQueue.getMessageCount().intValue()); - assertEquals("Unexpected queue depth on source queue after first move", 7, _managedSourceQueue.getMessageCount().intValue()); - - // Now move a further two messages to destination - fromMessageId = amqMessagesIds.get(7); - toMessageId = amqMessagesIds.get(8); - _managedSourceQueue.moveMessages(fromMessageId, toMessageId, _destinationQueueName); - assertEquals("Unexpected queue depth on destination queue after second move", 5, _managedDestinationQueue.getMessageCount().intValue()); - assertEquals("Unexpected queue depth on source queue after second move", 5, _managedSourceQueue.getMessageCount().intValue()); - - assertMessageIndicesOn(_destinationQueue, 0, 1, 2, 7, 8); - } - - /** - * Tests {@link ManagedQueue#copyMessages(long, long, String)} interface. - */ - public void testCopyMessagesBetweenQueues() throws Exception - { - final int numberOfMessagesToSend = 10; - sendMessage(_session, _sourceQueue, numberOfMessagesToSend); - syncSession(_session); - assertEquals("Unexpected queue depth after send", numberOfMessagesToSend, _managedSourceQueue.getMessageCount().intValue()); - - List<Long> amqMessagesIds = getAMQMessageIdsOn(_managedSourceQueue, 1, numberOfMessagesToSend); - - // Copy first three messages to destination - long fromMessageId = amqMessagesIds.get(0); - long toMessageId = amqMessagesIds.get(2); - _managedSourceQueue.copyMessages(fromMessageId, toMessageId, _destinationQueueName); - - assertEquals("Unexpected queue depth on destination queue after first copy", 3, _managedDestinationQueue.getMessageCount().intValue()); - assertEquals("Unexpected queue depth on source queue after first copy", numberOfMessagesToSend, _managedSourceQueue.getMessageCount().intValue()); - - // Now copy a further two messages to destination - fromMessageId = amqMessagesIds.get(7); - toMessageId = amqMessagesIds.get(8); - _managedSourceQueue.copyMessages(fromMessageId, toMessageId, _destinationQueueName); - assertEquals("Unexpected queue depth on destination queue after second copy", 5, _managedDestinationQueue.getMessageCount().intValue()); - assertEquals("Unexpected queue depth on source queue after second copy", numberOfMessagesToSend, _managedSourceQueue.getMessageCount().intValue()); - - assertMessageIndicesOn(_destinationQueue, 0, 1, 2, 7, 8); - } - - public void testMoveMessagesBetweenQueuesWithActiveConsumerOnSourceQueue() throws Exception - { - setTestClientSystemProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, new Integer(1).toString()); - Connection asyncConnection = getConnection(); - asyncConnection.start(); - - final int numberOfMessagesToSend = 50; - sendMessage(_session, _sourceQueue, numberOfMessagesToSend); - syncSession(_session); - assertEquals("Unexpected queue depth after send", numberOfMessagesToSend, _managedSourceQueue.getMessageCount().intValue()); - - List<Long> amqMessagesIds = getAMQMessageIdsOn(_managedSourceQueue, 1, numberOfMessagesToSend); - - long fromMessageId = amqMessagesIds.get(0); - long toMessageId = amqMessagesIds.get(numberOfMessagesToSend - 1); - - CountDownLatch consumerReadToHalfwayLatch = new CountDownLatch(numberOfMessagesToSend / 2); - AtomicInteger totalConsumed = new AtomicInteger(0); - startAsyncConsumerOn(_sourceQueue, asyncConnection, consumerReadToHalfwayLatch, totalConsumed); - - boolean halfwayPointReached = consumerReadToHalfwayLatch.await(5000, TimeUnit.MILLISECONDS); - assertTrue("Did not read half of messages within time allowed", halfwayPointReached); - - _managedSourceQueue.moveMessages(fromMessageId, toMessageId, _destinationQueueName); - - asyncConnection.stop(); - - // The exact number of messages moved will be non deterministic, as the number of messages processed - // by the consumer cannot be predicited. There is also the possibility that a message can remain - // on the source queue. This situation will arise if a message has been acquired by the consumer, but not - // yet delivered to the client application (i.e. MessageListener#onMessage()) when the Connection#stop() occurs. - // - // The number of messages moved + the number consumed + any messages remaining on source should - // *always* be equal to the number we originally sent. - - int numberOfMessagesReadByConsumer = totalConsumed.intValue(); - int numberOfMessagesOnDestinationQueue = _managedDestinationQueue.getMessageCount().intValue(); - int numberOfMessagesRemainingOnSourceQueue = _managedSourceQueue.getMessageCount().intValue(); - - LOGGER.debug("Async consumer read : " + numberOfMessagesReadByConsumer - + " Number of messages moved to destination : " + numberOfMessagesOnDestinationQueue - + " Number of messages remaining on source : " + numberOfMessagesRemainingOnSourceQueue); - assertEquals("Unexpected number of messages after move", numberOfMessagesToSend, numberOfMessagesReadByConsumer + numberOfMessagesOnDestinationQueue + numberOfMessagesRemainingOnSourceQueue); - } - - public void testMoveMessagesBetweenQueuesWithActiveConsumerOnDestinationQueue() throws Exception - { - setTestClientSystemProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, new Integer(1).toString()); - Connection asyncConnection = getConnection(); - asyncConnection.start(); - - final int numberOfMessagesToSend = 50; - sendMessage(_session, _sourceQueue, numberOfMessagesToSend); - syncSession(_session); - assertEquals("Unexpected queue depth after send", numberOfMessagesToSend, _managedSourceQueue.getMessageCount().intValue()); - - List<Long> amqMessagesIds = getAMQMessageIdsOn(_managedSourceQueue, 1, numberOfMessagesToSend); - long fromMessageId = amqMessagesIds.get(0); - long toMessageId = amqMessagesIds.get(numberOfMessagesToSend - 1); - - AtomicInteger totalConsumed = new AtomicInteger(0); - CountDownLatch allMessagesConsumedLatch = new CountDownLatch(numberOfMessagesToSend); - startAsyncConsumerOn(_destinationQueue, asyncConnection, allMessagesConsumedLatch, totalConsumed); - - _managedSourceQueue.moveMessages(fromMessageId, toMessageId, _destinationQueueName); - - allMessagesConsumedLatch.await(5000, TimeUnit.MILLISECONDS); - assertEquals("Did not consume all messages from destination queue", numberOfMessagesToSend, totalConsumed.intValue()); - } - - private void startAsyncConsumerOn(Destination queue, Connection asyncConnection, - final CountDownLatch requiredNumberOfMessagesRead, final AtomicInteger totalConsumed) throws Exception - { - Session session = asyncConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageConsumer consumer = session.createConsumer(queue); - consumer.setMessageListener(new MessageListener() - { - - @Override - public void onMessage(Message arg0) - { - totalConsumed.incrementAndGet(); - requiredNumberOfMessagesRead.countDown(); - } - }); - } - - private void assertMessageIndicesOn(Destination queue, int... expectedIndices) throws Exception - { - MessageConsumer consumer = _session.createConsumer(queue); - - for (int i : expectedIndices) - { - Message message = consumer.receive(1000); - assertNotNull("Expected message with index " + i, message); - assertEquals("Expected message with index " + i, i, message.getIntProperty(INDEX)); - } - - assertNull("Unexpected message encountered", consumer.receive(1000)); - } - - private List<Long> getAMQMessageIdsOn(ManagedQueue managedQueue, long startIndex, long endIndex) throws Exception - { - final SortedSet<Long> messageIds = new TreeSet<Long>(); - - final TabularData tab = managedQueue.viewMessages(startIndex, endIndex); - final Iterator<CompositeData> rowItr = (Iterator<CompositeData>) tab.values().iterator(); - while(rowItr.hasNext()) - { - final CompositeData row = rowItr.next(); - long amqMessageId = (Long)row.get(ManagedQueue.MSG_AMQ_ID); - messageIds.add(amqMessageId); - } - - return new ArrayList<Long>(messageIds); - } - - /** - * - * Utility method to convert array of Strings in the form x = y into a - * map with key/value x => y. - * - */ - private Map<String,String> headerArrayToMap(final String[] headerArray) - { - final Map<String, String> headerMap = new HashMap<String, String>(); - final List<String> headerList = Arrays.asList(headerArray); - for (Iterator<String> iterator = headerList.iterator(); iterator.hasNext();) - { - final String nameValuePair = iterator.next(); - final String[] nameValue = nameValuePair.split(" *= *", 2); - headerMap.put(nameValue[0], nameValue[1]); - } - return headerMap; - } - - private void createQueueOnBroker(Destination destination) throws JMSException - { - _session.createConsumer(destination).close(); // Create a consumer only to cause queue creation - } - - private void syncSession(Session session) throws Exception - { - ((AMQSession<?,?>)session).sync(); - } - -} diff --git a/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagementActorLoggingTest.java b/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagementActorLoggingTest.java deleted file mode 100644 index 0e2875235f..0000000000 --- a/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagementActorLoggingTest.java +++ /dev/null @@ -1,480 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.management.jmx; - -import org.apache.qpid.management.common.mbeans.ManagedBroker; -import org.apache.qpid.management.common.mbeans.ManagedConnection; -import org.apache.qpid.management.common.mbeans.ManagedExchange; -import org.apache.qpid.server.logging.AbstractTestLogging; -import org.apache.qpid.server.logging.subjects.AbstractTestLogSubject; -import org.apache.qpid.test.utils.JMXTestUtils; - -import javax.jms.Connection; -import javax.jms.ExceptionListener; -import javax.jms.JMSException; -import javax.management.JMException; -import java.io.IOException; -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -/** - * Test class to test if any change in the broker JMX code is affesting the management console - * There are some hardcoding of management feature names and parameter names to create a customized - * look in the console. - */ -public class ManagementActorLoggingTest extends AbstractTestLogging -{ - private JMXTestUtils _jmxUtils; - private boolean _closed = false; - - @Override - public void setUp() throws Exception - { - _jmxUtils = new JMXTestUtils(this); - _jmxUtils.setUp(); - super.setUp(); - _jmxUtils.open(); - } - - @Override - public void tearDown() throws Exception - { - if(!_closed) - { - _jmxUtils.close(); - } - super.tearDown(); - } - - /** - * Description: - * When a connected client has its connection closed via the Management Console this will be logged as a CON-1002 message. - * Input: - * - * 1. Running Broker - * 2. Connected Client - * 3. Connection is closed via Management Console - * Output: - * - * <date> CON-1002 : Close - * - * Validation Steps: - * 4. The CON ID is correct - * 5. This must be the last CON message for the Connection - * 6. It must be preceded by a CON-1001 for this Connection - * - * @throws Exception - {@see ManagedConnection.closeConnection and #getConnection} - * @throws java.io.IOException - if there is a problem reseting the log monitor - */ - public void testConnectionCloseViaManagement() throws IOException, Exception - { - //Create a connection to the broker - Connection connection = getConnection(); - - // Monitor the connection for an exception being thrown - // this should be a DisconnectionException but it is not this tests - // job to valiate that. Only use the exception as a synchronisation - // to check the log file for the Close message - final CountDownLatch exceptionReceived = new CountDownLatch(1); - connection.setExceptionListener(new ExceptionListener() - { - public void onException(JMSException e) - { - //Failover being attempted. - exceptionReceived.countDown(); - } - }); - - //Remove the connection close from any 0-10 connections - _monitor.markDiscardPoint(); - - // Get a managedConnection - ManagedConnection mangedConnection = _jmxUtils.getManagedObject(ManagedConnection.class, "org.apache.qpid:type=VirtualHost.Connection,*"); - - //Close the connection - mangedConnection.closeConnection(); - - //Wait for the connection to close - assertTrue("Timed out waiting for conneciton to report close", - exceptionReceived.await(2, TimeUnit.SECONDS)); - - //Validate results - List<String> results = waitAndFindMatches("CON-1002"); - - assertEquals("Unexpected Connection Close count", 1, results.size()); - } - - /** - * Description: - * Exchange creation is possible from the Management Console. - * When an exchanged is created in this way then a EXH-1001 create message - * is expected to be logged. - * Input: - * - * 1. Running broker - * 2. Connected Management Console - * 3. Exchange Created via Management Console - * Output: - * - * EXH-1001 : Create : [Durable] Type:<value> Name:<value> - * - * Validation Steps: - * 4. The EXH ID is correct - * 5. The correct tags are present in the message based on the create options - * - * @throws java.io.IOException - if there is a problem reseting the log monitor - * @throws javax.management.JMException - {@see #createQueue and ManagedExchange.deleteQueue} - */ - public void testCreateExchangeDirectTransientViaManagementConsole() throws IOException, JMException - { - _monitor.markDiscardPoint(); - - _jmxUtils.createExchange("test", getName(), "direct", false); - - // Validate - - //1 - ID is correct - List<String> results = waitAndFindMatches("EXH-1001"); - - assertEquals("More than one exchange creation found", 1, results.size()); - - String log = getLogMessage(results, 0); - - // Validate correct exchange name - assertTrue("Incorrect exchange name created:" + log, log.endsWith(getName())); - - // Validate it was a management actor. - String actor = fromActor(log); - assertTrue("Actor is not a manangement actor:" + actor, actor.startsWith("mng")); - } - - public void testCreateExchangeTopicTransientViaManagementConsole() throws IOException, JMException - { - //Remove any previous exchange declares - _monitor.markDiscardPoint(); - - _jmxUtils.createExchange("test", getName(), "topic", false); - - // Validate - - //1 - ID is correct - List<String> results = waitAndFindMatches("EXH-1001"); - - assertEquals("More than one exchange creation found", 1, results.size()); - - String log = getLogMessage(results, 0); - - // Validate correct exchange name - assertTrue("Incorrect exchange name created:" + log, log.endsWith(getName())); - - // Validate it was a management actor. - String actor = fromActor(log); - assertTrue("Actor is not a manangement actor:" + actor, actor.startsWith("mng")); - - } - - public void testCreateExchangeFanoutTransientViaManagementConsole() throws IOException, JMException - { - //Remove any previous exchange declares - _monitor.markDiscardPoint(); - - _jmxUtils.createExchange("test", getName(), "fanout", false); - - // Validate - - //1 - ID is correct - List<String> results = waitAndFindMatches("EXH-1001"); - - assertEquals("More than one exchange creation found", 1, results.size()); - - String log = getLogMessage(results, 0); - - // Validate correct exchange name - assertTrue("Incorrect exchange name created:" + log, log.endsWith(getName())); - - // Validate it was a management actor. - String actor = fromActor(log); - assertTrue("Actor is not a manangement actor:" + actor, actor.startsWith("mng")); - - } - - public void testCreateExchangeHeadersTransientViaManagementConsole() throws IOException, JMException - { - //Remove any previous exchange declares - _monitor.markDiscardPoint(); - - _jmxUtils.createExchange("test", getName(), "headers", false); - - // Validate - - //1 - ID is correct - List<String> results = waitAndFindMatches("EXH-1001"); - - assertEquals("More than one exchange creation found", 1, results.size()); - - String log = getLogMessage(results, 0); - - // Validate correct exchange name - assertTrue("Incorrect exchange name created:" + log, log.endsWith(getName())); - - // Validate it was a management actor. - String actor = fromActor(log); - assertTrue("Actor is not a manangement actor:" + actor, actor.startsWith("mng")); - - } - - /** - * Description: - * Queue creation is possible from the Management Console. When a queue is created in this way then a QUE-1001 create message is expected to be logged. - * Input: - * - * 1. Running broker - * 2. Connected Management Console - * 3. Queue Created via Management Console - * Output: - * - * <date> QUE-1001 : Create : Transient Owner:<name> - * - * Validation Steps: - * 4. The QUE ID is correct - * 5. The correct tags are present in the message based on the create options - * - * @throws java.io.IOException - if there is a problem reseting the log monitor - * @throws javax.management.JMException - {@see #createQueue and ManagedExchange.deleteQueue} - */ - public void testCreateQueueTransientViaManagementConsole() throws IOException, JMException - { - //Remove any previous queue declares - _monitor.markDiscardPoint(); - - _jmxUtils.createQueue("test", getName(), null, false); - - // Validate - - List<String> results = waitAndFindMatches("QUE-1001"); - - assertEquals("More than one queue creation found", 1, results.size()); - - String log = getLogMessage(results, 0); - - // Validate correct queue name - String subject = fromSubject(log); - assertEquals("Incorrect queue name created", getName(), AbstractTestLogSubject.getSlice("qu", subject)); - - // Validate it was a management actor. - String actor = fromActor(log); - assertTrue("Actor is not a manangement actor:" + actor, actor.startsWith("mng")); - } - - /** - * Description: - * The ManagementConsole can be used to delete a queue. When this is done a QUE-1002 Deleted message must be logged. - * Input: - * - * 1. Running Broker - * 2. Queue created on the broker with no subscribers - * 3. Management Console connected - * 4. Queue is deleted via Management Console - * Output: - * - * <date> QUE-1002 : Deleted - * - * Validation Steps: - * 5. The QUE ID is correct - * - * @throws java.io.IOException - if there is a problem reseting the log monitor - * @throws javax.management.JMException - {@see #createQueue and ManagedExchange.deleteQueue} - */ - public void testQueueDeleteViaManagementConsole() throws IOException, JMException - { - //Remove any previous queue declares - _monitor.markDiscardPoint(); - - _jmxUtils.createQueue("test", getName(), null, false); - - ManagedBroker managedBroker = _jmxUtils.getManagedBroker("test"); - - managedBroker.deleteQueue(getName()); - - List<String> results = waitAndFindMatches("QUE-1002"); - - assertEquals("More than one queue deletion found", 1, results.size()); - - String log = getLog(results.get(0)); - - // Validate correct binding - String subject = fromSubject(log); - assertEquals("Incorrect queue named in delete", getName(), AbstractTestLogSubject.getSlice("qu", subject)); - - // Validate it was a management actor. - String actor = fromActor(log); - assertTrue("Actor is not a manangement actor:" + actor, actor.startsWith("mng")); - - } - - /** - * Description: - * The binding of a Queue and an Exchange is done via a Binding. When this Binding is created via the Management Console a BND-1001 Create message will be logged. - * Input: - * - * 1. Running Broker - * 2. Connected Management Console - * 3. Use Management Console to perform binding - * Output: - * - * <date> BND-1001 : Create - * - * Validation Steps: - * 4. The BND ID is correct - * 5. This will be the first message for the given binding - * - * @throws java.io.IOException - if there is a problem reseting the log monitor - * @throws javax.management.JMException - {@see #createQueue and ManagedExchange.createNewBinding} - */ - public void testBindingCreateOnDirectViaManagementConsole() throws IOException, JMException - { - //Remove any previous queue declares - _monitor.markDiscardPoint(); - - _jmxUtils.createQueue("test", getName(), null, false); - - ManagedExchange managedExchange = _jmxUtils.getManagedExchange("amq.direct"); - - managedExchange.createNewBinding(getName(), getName()); - - List<String> results = waitAndFindMatches("BND-1001"); - - assertEquals("Unexpected number of bindings logged", 2, results.size()); - - String log = getLogMessage(results, 0); - - // Validate correct binding - String subject = fromSubject(log); - assertEquals("Incorrect queue named in create", getName(), AbstractTestLogSubject.getSlice("qu", subject)); - assertEquals("Incorrect routing key in create", getName(), AbstractTestLogSubject.getSlice("rk", subject)); - - // Validate it was a management actor. - String actor = fromActor(log); - assertTrue("Actor is not a manangement actor:" + actor, actor.startsWith("mng")); - } - - public void testBindingCreateOnTopicViaManagementConsole() throws IOException, JMException - { - //Remove any previous queue declares - _monitor.markDiscardPoint(); - - _jmxUtils.createQueue("test", getName(), null, false); - - ManagedExchange managedExchange = _jmxUtils.getManagedExchange("amq.topic"); - - managedExchange.createNewBinding(getName(), getName()); - - List<String> results = waitAndFindMatches("BND-1001"); - - assertEquals("Unexpected number of bindings logged", 2, results.size()); - - String log = getLogMessage(results, 0); - - // Validate correct binding - String subject = fromSubject(log); - assertEquals("Incorrect queue named in create", getName(), AbstractTestLogSubject.getSlice("qu", subject)); - assertEquals("Incorrect routing key in create", getName(), AbstractTestLogSubject.getSlice("rk", subject)); - - // Validate it was a management actor. - String actor = fromActor(log); - assertTrue("Actor is not a manangement actor:" + actor, actor.startsWith("mng")); - } - - public void testBindingCreateOnFanoutViaManagementConsole() throws IOException, JMException - { - //Remove any previous queue declares - _monitor.markDiscardPoint(); - - _jmxUtils.createQueue("test", getName(), null, false); - - ManagedExchange managedExchange = _jmxUtils.getManagedExchange("amq.fanout"); - - managedExchange.createNewBinding(getName(), getName()); - - List<String> results = waitAndFindMatches("BND-1001"); - - assertEquals("Unexpected number of bindings logged", 2, results.size()); - - String log = getLogMessage(results, 0); - - // Validate correct binding - String subject = fromSubject(log); - assertEquals("Incorrect queue named in create", getName(), AbstractTestLogSubject.getSlice("qu", subject)); - assertEquals("Incorrect routing key in create", getName(), AbstractTestLogSubject.getSlice("rk", subject)); - - // Validate it was a management actor. - String actor = fromActor(log); - assertTrue("Actor is not a manangement actor:" + actor, actor.startsWith("mng")); - } - - /** - * Description: - * Bindings can be deleted so that a queue can be rebound with a different set of values. This can be performed via the Management Console - * Input: - * - * 1. Running Broker - * 2. Management Console connected - * 3. Management Console is used to perform unbind. - * Output: - * - * <date> BND-1002 : Deleted - * - * Validation Steps: - * 4. The BND ID is correct - * 5. There must have been a BND-1001 Create message first. - * 6. This will be the last message for the given binding - * - * @throws java.io.IOException - if there is a problem reseting the log monitor or an issue with the JMX Connection - * @throws javax.management.JMException - {@see #createExchange and ManagedBroker.unregisterExchange} - */ - public void testUnRegisterExchangeViaManagementConsole() throws IOException, JMException - { - //Remove any previous queue declares - _monitor.markDiscardPoint(); - - _jmxUtils.createExchange("test", getName(), "direct", false); - - ManagedBroker managedBroker = _jmxUtils.getManagedBroker("test"); - - managedBroker.unregisterExchange(getName()); - - List<String> results = waitAndFindMatches("EXH-1002"); - - assertEquals("More than one exchange deletion found", 1, results.size()); - - String log = getLog(results.get(0)); - - // Validate correct binding - String subject = fromSubject(log); - assertEquals("Incorrect exchange named in delete", "direct/" + getName(), AbstractTestLogSubject.getSlice("ex", subject)); - - // Validate it was a management actor. - String actor = fromActor(log); - assertTrue("Actor is not a manangement actor:" + actor, actor.startsWith("mng")); - } - -} diff --git a/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageConnectionStatisticsTest.java b/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageConnectionStatisticsTest.java deleted file mode 100644 index 9465749226..0000000000 --- a/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageConnectionStatisticsTest.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.management.jmx; - -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.management.common.mbeans.ManagedBroker; -import org.apache.qpid.management.common.mbeans.ManagedConnection; - -import javax.jms.Connection; -import java.util.ArrayList; -import java.util.List; - -/** - * Test enabling generation of message statistics on a per-connection basis. - */ -public class MessageConnectionStatisticsTest extends MessageStatisticsTestCase -{ - public void configureStatistics() throws Exception - { - // no statistics generation configured - } - - /** - * Test statistics on a single connection - */ - public void testEnablingStatisticsPerConnection() throws Exception - { - ManagedBroker vhost = _jmxUtils.getManagedBroker("test"); - - sendUsing(_test, 5, 200); - Thread.sleep(1000); - - List<String> addresses = new ArrayList<String>(); - for (ManagedConnection mc : _jmxUtils.getManagedConnections("test")) - { - assertEquals("Incorrect connection total", 0, mc.getTotalMessagesReceived()); - assertEquals("Incorrect connection data", 0, mc.getTotalDataReceived()); - assertFalse("Connection statistics should not be enabled", mc.isStatisticsEnabled()); - - addresses.add(mc.getRemoteAddress()); - } - assertEquals("Incorrect vhost total", 0, vhost.getTotalMessagesReceived()); - assertEquals("Incorrect vhost data", 0, vhost.getTotalDataReceived()); - - Connection test = new AMQConnection(_brokerUrl, USER, USER, "clientid", "test"); - test.start(); - for (ManagedConnection mc : _jmxUtils.getManagedConnections("test")) - { - if (addresses.contains(mc.getRemoteAddress())) - { - continue; - } - mc.setStatisticsEnabled(true); - assertEquals("Incorrect connection total", 0, mc.getTotalMessagesReceived()); - assertEquals("Incorrect connection data", 0, mc.getTotalDataReceived()); - } - - sendUsing(test, 5, 200); - sendUsing(_test, 5, 200); - Thread.sleep(1000); - - for (ManagedConnection mc : _jmxUtils.getManagedConnections("test")) - { - if (addresses.contains(mc.getRemoteAddress())) - { - assertEquals("Incorrect connection total", 0, mc.getTotalMessagesReceived()); - assertEquals("Incorrect connection data", 0, mc.getTotalDataReceived()); - assertFalse("Connection statistics should not be enabled", mc.isStatisticsEnabled()); - } - else - { - assertEquals("Incorrect connection total", 5, mc.getTotalMessagesReceived()); - assertEquals("Incorrect connection data", 1000, mc.getTotalDataReceived()); - assertTrue("Connection statistics should be enabled", mc.isStatisticsEnabled()); - } - } - assertEquals("Incorrect vhost total", 0, vhost.getTotalMessagesReceived()); - assertEquals("Incorrect vhost data", 0, vhost.getTotalDataReceived()); - assertFalse("Vhost statistics should not be enabled", vhost.isStatisticsEnabled()); - - test.close(); - } -} diff --git a/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsConfigurationTest.java b/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsConfigurationTest.java deleted file mode 100644 index 383c4c00a8..0000000000 --- a/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsConfigurationTest.java +++ /dev/null @@ -1,162 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.management.jmx; - -import org.apache.qpid.management.common.mbeans.ManagedBroker; -import org.apache.qpid.management.common.mbeans.ManagedConnection; - -/** - * Test enabling generation of message statistics on a per-connection basis. - */ -public class MessageStatisticsConfigurationTest extends MessageStatisticsTestCase -{ - public void configureStatistics() throws Exception - { - setConfigurationProperty("statistics.generation.broker", Boolean.toString(getName().contains("Broker"))); - setConfigurationProperty("statistics.generation.virtualhosts", Boolean.toString(getName().contains("Virtualhost"))); - setConfigurationProperty("statistics.generation.connections", Boolean.toString(getName().contains("Connection"))); - } - - /** - * Just broker statistics. - */ - public void testGenerateBrokerStatistics() throws Exception - { - sendUsing(_test, 5, 200); - Thread.sleep(1000); - - for (ManagedConnection mc : _jmxUtils.getManagedConnections("test")) - { - assertEquals("Incorrect connection total", 0, mc.getTotalMessagesReceived()); - assertEquals("Incorrect connection data", 0, mc.getTotalDataReceived()); - assertFalse("Connection statistics should not be enabled", mc.isStatisticsEnabled()); - } - - ManagedBroker vhost = _jmxUtils.getManagedBroker("test"); - assertEquals("Incorrect vhost data", 0, vhost.getTotalMessagesReceived()); - assertEquals("Incorrect vhost data", 0, vhost.getTotalDataReceived()); - assertFalse("Vhost statistics should not be enabled", vhost.isStatisticsEnabled()); - - assertEquals("Incorrect server total messages", 5, _jmxUtils.getServerInformation().getTotalMessagesReceived()); - assertEquals("Incorrect server total data", 1000, _jmxUtils.getServerInformation().getTotalDataReceived()); - assertTrue("Server statistics should be enabled", _jmxUtils.getServerInformation().isStatisticsEnabled()); - } - - /** - * Just virtualhost statistics. - */ - public void testGenerateVirtualhostStatistics() throws Exception - { - sendUsing(_test, 5, 200); - Thread.sleep(1000); - - for (ManagedConnection mc : _jmxUtils.getManagedConnections("test")) - { - assertEquals("Incorrect connection total", 0, mc.getTotalMessagesReceived()); - assertEquals("Incorrect connection data", 0, mc.getTotalDataReceived()); - assertFalse("Connection statistics should not be enabled", mc.isStatisticsEnabled()); - } - - ManagedBroker vhost = _jmxUtils.getManagedBroker("test"); - assertEquals("Incorrect vhost data", 5, vhost.getTotalMessagesReceived()); - assertEquals("Incorrect vhost data", 1000, vhost.getTotalDataReceived()); - assertTrue("Vhost statistics should be enabled", vhost.isStatisticsEnabled()); - - assertEquals("Incorrect server total messages", 0, _jmxUtils.getServerInformation().getTotalMessagesReceived()); - assertEquals("Incorrect server total data", 0, _jmxUtils.getServerInformation().getTotalDataReceived()); - assertFalse("Server statistics should not be enabled", _jmxUtils.getServerInformation().isStatisticsEnabled()); - } - - /** - * Just connection statistics. - */ - public void testGenerateConnectionStatistics() throws Exception - { - sendUsing(_test, 5, 200); - Thread.sleep(1000); - - for (ManagedConnection mc : _jmxUtils.getManagedConnections("test")) - { - assertEquals("Incorrect connection total", 5, mc.getTotalMessagesReceived()); - assertEquals("Incorrect connection data", 1000, mc.getTotalDataReceived()); - assertTrue("Connection statistics should be enabled", mc.isStatisticsEnabled()); - } - - ManagedBroker vhost = _jmxUtils.getManagedBroker("test"); - assertEquals("Incorrect vhost data", 0, vhost.getTotalMessagesReceived()); - assertEquals("Incorrect vhost data", 0, vhost.getTotalDataReceived()); - assertFalse("Vhost statistics should not be enabled", vhost.isStatisticsEnabled()); - - assertEquals("Incorrect server total messages", 0, _jmxUtils.getServerInformation().getTotalMessagesReceived()); - assertEquals("Incorrect server total data", 0, _jmxUtils.getServerInformation().getTotalDataReceived()); - assertFalse("Server statistics should not be enabled", _jmxUtils.getServerInformation().isStatisticsEnabled()); - } - - /** - * Both broker and virtualhost statistics. - */ - public void testGenerateBrokerAndVirtualhostStatistics() throws Exception - { - sendUsing(_test, 5, 200); - Thread.sleep(1000); - - for (ManagedConnection mc : _jmxUtils.getManagedConnections("test")) - { - assertEquals("Incorrect connection total", 0, mc.getTotalMessagesReceived()); - assertEquals("Incorrect connection data", 0, mc.getTotalDataReceived()); - assertFalse("Connection statistics should not be enabled", mc.isStatisticsEnabled()); - } - - ManagedBroker vhost = _jmxUtils.getManagedBroker("test"); - assertEquals("Incorrect vhost data", 5, vhost.getTotalMessagesReceived()); - assertEquals("Incorrect vhost data", 1000, vhost.getTotalDataReceived()); - assertTrue("Vhost statistics should be enabled", vhost.isStatisticsEnabled()); - - assertEquals("Incorrect server total messages", 5, _jmxUtils.getServerInformation().getTotalMessagesReceived()); - assertEquals("Incorrect server total data", 1000, _jmxUtils.getServerInformation().getTotalDataReceived()); - assertTrue("Server statistics should be enabled", _jmxUtils.getServerInformation().isStatisticsEnabled()); - } - - /** - * Broker, virtualhost and connection statistics. - */ - public void testGenerateBrokerVirtualhostAndConnectionStatistics() throws Exception - { - sendUsing(_test, 5, 200); - Thread.sleep(1000); - - for (ManagedConnection mc : _jmxUtils.getManagedConnections("test")) - { - assertEquals("Incorrect connection total", 5, mc.getTotalMessagesReceived()); - assertEquals("Incorrect connection data", 1000, mc.getTotalDataReceived()); - assertTrue("Connection statistics should be enabled", mc.isStatisticsEnabled()); - } - - ManagedBroker vhost = _jmxUtils.getManagedBroker("test"); - assertEquals("Incorrect vhost data", 5, vhost.getTotalMessagesReceived()); - assertEquals("Incorrect vhost data", 1000, vhost.getTotalDataReceived()); - assertTrue("Vhost statistics should be enabled", vhost.isStatisticsEnabled()); - - assertEquals("Incorrect server total messages", 5, _jmxUtils.getServerInformation().getTotalMessagesReceived()); - assertEquals("Incorrect server total data", 1000, _jmxUtils.getServerInformation().getTotalDataReceived()); - assertTrue("Server statistics should be enabled", _jmxUtils.getServerInformation().isStatisticsEnabled()); - } -} diff --git a/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsDeliveryTest.java b/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsDeliveryTest.java deleted file mode 100644 index bdfd1e2c14..0000000000 --- a/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsDeliveryTest.java +++ /dev/null @@ -1,109 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.management.jmx; - -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.management.common.mbeans.ManagedBroker; -import org.apache.qpid.management.common.mbeans.ManagedConnection; - -import javax.jms.Connection; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.Session; -import java.util.ArrayList; -import java.util.List; - -/** - * Test statistics for delivery and receipt. - */ -public class MessageStatisticsDeliveryTest extends MessageStatisticsTestCase -{ - public void configureStatistics() throws Exception - { - setConfigurationProperty("statistics.generation.broker", "true"); - setConfigurationProperty("statistics.generation.virtualhosts", "true"); - setConfigurationProperty("statistics.generation.connections", "true"); - } - - public void testDeliveryAndReceiptStatistics() throws Exception - { - ManagedBroker vhost = _jmxUtils.getManagedBroker("test"); - - sendUsing(_test, 5, 200); - Thread.sleep(1000); - - List<String> addresses = new ArrayList<String>(); - for (ManagedConnection mc : _jmxUtils.getManagedConnections("test")) - { - assertEquals("Incorrect connection delivery total", 0, mc.getTotalMessagesDelivered()); - assertEquals("Incorrect connection delivery data", 0, mc.getTotalDataDelivered()); - assertEquals("Incorrect connection receipt total", 5, mc.getTotalMessagesReceived()); - assertEquals("Incorrect connection receipt data", 1000, mc.getTotalDataReceived()); - - addresses.add(mc.getRemoteAddress()); - } - - assertEquals("Incorrect vhost delivery total", 0, vhost.getTotalMessagesDelivered()); - assertEquals("Incorrect vhost delivery data", 0, vhost.getTotalDataDelivered()); - assertEquals("Incorrect vhost receipt total", 5, vhost.getTotalMessagesReceived()); - assertEquals("Incorrect vhost receipt data", 1000, vhost.getTotalDataReceived()); - - Connection test = new AMQConnection(_brokerUrl, USER, USER, "clientid", "test"); - test.start(); - receiveUsing(test, 5); - - for (ManagedConnection mc : _jmxUtils.getManagedConnections("test")) - { - if (addresses.contains(mc.getRemoteAddress())) - { - assertEquals("Incorrect connection delivery total", 0, mc.getTotalMessagesDelivered()); - assertEquals("Incorrect connection delivery data", 0, mc.getTotalDataDelivered()); - assertEquals("Incorrect connection receipt total", 5, mc.getTotalMessagesReceived()); - assertEquals("Incorrect connection receipt data", 1000, mc.getTotalDataReceived()); - } - else - { - assertEquals("Incorrect connection delivery total", 5, mc.getTotalMessagesDelivered()); - assertEquals("Incorrect connection delivery data", 1000, mc.getTotalDataDelivered()); - assertEquals("Incorrect connection receipt total", 0, mc.getTotalMessagesReceived()); - assertEquals("Incorrect connection receipt data", 0, mc.getTotalDataReceived()); - } - } - assertEquals("Incorrect vhost delivery total", 5, vhost.getTotalMessagesDelivered()); - assertEquals("Incorrect vhost delivery data", 1000, vhost.getTotalDataDelivered()); - assertEquals("Incorrect vhost receipt total", 5, vhost.getTotalMessagesReceived()); - assertEquals("Incorrect vhost receipt data", 1000, vhost.getTotalDataReceived()); - - test.close(); - } - - protected void receiveUsing(Connection con, int number) throws Exception - { - Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); - createQueue(session); - MessageConsumer consumer = session.createConsumer(_queue); - for (int i = 0; i < number; i++) - { - Message msg = consumer.receive(1000); - assertNotNull("Message " + i + " was not received", msg); - } - } -} diff --git a/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsTest.java b/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsTest.java deleted file mode 100644 index de4567624d..0000000000 --- a/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsTest.java +++ /dev/null @@ -1,196 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.management.jmx; - -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.management.common.mbeans.ManagedBroker; -import org.apache.qpid.management.common.mbeans.ManagedConnection; - -import javax.jms.Connection; - -/** - * Test generation of message statistics. - */ -public class MessageStatisticsTest extends MessageStatisticsTestCase -{ - public void configureStatistics() throws Exception - { - setConfigurationProperty("statistics.generation.broker", "true"); - setConfigurationProperty("statistics.generation.virtualhosts", "true"); - setConfigurationProperty("statistics.generation.connections", "true"); - } - - /** - * Test message totals. - */ - public void testMessageTotals() throws Exception - { - sendUsing(_test, 10, 100); - sendUsing(_dev, 20, 100); - sendUsing(_local, 5, 100); - sendUsing(_local, 5, 100); - sendUsing(_local, 5, 100); - Thread.sleep(2000); - - ManagedBroker test = _jmxUtils.getManagedBroker("test"); - ManagedBroker dev = _jmxUtils.getManagedBroker("development"); - ManagedBroker local = _jmxUtils.getManagedBroker("localhost"); - - if (!isBroker010()) - { - long total = 0; - long data = 0; - for (ManagedConnection mc : _jmxUtils.getAllManagedConnections()) - { - total += mc.getTotalMessagesReceived(); - data += mc.getTotalDataReceived(); - } - assertEquals("Incorrect connection total", 45, total); - assertEquals("Incorrect connection data", 4500, data); - } - assertEquals("Incorrect server total", 45, _jmxUtils.getServerInformation().getTotalMessagesReceived()); - assertEquals("Incorrect server data", 4500, _jmxUtils.getServerInformation().getTotalDataReceived()); - - if (!isBroker010()) - { - long testTotal = 0; - long testData = 0; - for (ManagedConnection mc : _jmxUtils.getManagedConnections("test")) - { - testTotal += mc.getTotalMessagesReceived(); - testData += mc.getTotalDataReceived(); - } - assertEquals("Incorrect test connection total", 10, testTotal); - assertEquals("Incorrect test connection data", 1000, testData); - } - assertEquals("Incorrect test vhost total", 10, test.getTotalMessagesReceived()); - assertEquals("Incorrect test vhost data", 1000, test.getTotalDataReceived()); - - if (!isBroker010()) - { - long devTotal = 0; - long devData = 0; - for (ManagedConnection mc : _jmxUtils.getManagedConnections("development")) - { - devTotal += mc.getTotalMessagesReceived(); - devData += mc.getTotalDataReceived(); - } - assertEquals("Incorrect test connection total", 20, devTotal); - assertEquals("Incorrect test connection data", 2000, devData); - } - assertEquals("Incorrect development total", 20, dev.getTotalMessagesReceived()); - assertEquals("Incorrect development data", 2000, dev.getTotalDataReceived()); - - if (!isBroker010()) - { - long localTotal = 0; - long localData = 0; - for (ManagedConnection mc : _jmxUtils.getManagedConnections("localhost")) - { - localTotal += mc.getTotalMessagesReceived(); - localData += mc.getTotalDataReceived(); - } - assertEquals("Incorrect test connection total", 15, localTotal); - assertEquals("Incorrect test connection data", 1500, localData); - } - assertEquals("Incorrect localhost total", 15, local.getTotalMessagesReceived()); - assertEquals("Incorrect localhost data", 1500, local.getTotalDataReceived()); - } - - /** - * Test message totals when a connection is closed. - */ - public void testMessageTotalsWithClosedConnections() throws Exception - { - Connection temp = new AMQConnection(_brokerUrl, USER, USER, "clientid", "test"); - temp.start(); - - sendUsing(_test, 10, 100); - sendUsing(temp, 10, 100); - sendUsing(_test, 10, 100); - Thread.sleep(2000); - - temp.close(); - - ManagedBroker test = _jmxUtils.getManagedBroker("test"); - - if (!isBroker010()) - { - long total = 0; - long data = 0; - for (ManagedConnection mc : _jmxUtils.getAllManagedConnections()) - { - total += mc.getTotalMessagesReceived(); - data += mc.getTotalDataReceived(); - } - assertEquals("Incorrect active connection total", 20, total); - assertEquals("Incorrect active connection data", 2000, data); - } - assertEquals("Incorrect server total", 30, _jmxUtils.getServerInformation().getTotalMessagesReceived()); - assertEquals("Incorrect server data", 3000, _jmxUtils.getServerInformation().getTotalDataReceived()); - - if (!isBroker010()) - { - long testTotal = 0; - long testData = 0; - for (ManagedConnection mc : _jmxUtils.getManagedConnections("test")) - { - testTotal += mc.getTotalMessagesReceived(); - testData += mc.getTotalDataReceived(); - } - assertEquals("Incorrect test active connection total", 20, testTotal); - assertEquals("Incorrect test active connection data", 20 * 100, testData); - } - assertEquals("Incorrect test vhost total", 30, test.getTotalMessagesReceived()); - assertEquals("Incorrect test vhost data", 30 * 100, test.getTotalDataReceived()); - } - - /** - * Test message totals when a vhost has its statistics reset - */ - public void testMessageTotalVhostReset() throws Exception - { - sendUsing(_test, 10, 10); - sendUsing(_dev, 10, 10); - Thread.sleep(2000); - - ManagedBroker test = _jmxUtils.getManagedBroker("test"); - ManagedBroker dev = _jmxUtils.getManagedBroker("development"); - - assertEquals("Incorrect test vhost total messages", 10, test.getTotalMessagesReceived()); - assertEquals("Incorrect test vhost total data", 100, test.getTotalDataReceived()); - assertEquals("Incorrect dev vhost total messages", 10, dev.getTotalMessagesReceived()); - assertEquals("Incorrect dev vhost total data", 100, dev.getTotalDataReceived()); - - assertEquals("Incorrect server total messages", 20, _jmxUtils.getServerInformation().getTotalMessagesReceived()); - assertEquals("Incorrect server total data", 200, _jmxUtils.getServerInformation().getTotalDataReceived()); - - test.resetStatistics(); - - assertEquals("Incorrect test vhost total messages", 0, test.getTotalMessagesReceived()); - assertEquals("Incorrect test vhost total data", 0, test.getTotalDataReceived()); - assertEquals("Incorrect dev vhost total messages", 10, dev.getTotalMessagesReceived()); - assertEquals("Incorrect dev vhost total data", 100, dev.getTotalDataReceived()); - - assertEquals("Incorrect server total messages", 20, _jmxUtils.getServerInformation().getTotalMessagesReceived()); - assertEquals("Incorrect server total data", 200, _jmxUtils.getServerInformation().getTotalDataReceived()); - } -} diff --git a/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsTestCase.java b/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsTestCase.java deleted file mode 100644 index 45200ba476..0000000000 --- a/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsTestCase.java +++ /dev/null @@ -1,128 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.management.jmx; - -import org.apache.qpid.AMQException; -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.AMQDestination; -import org.apache.qpid.client.AMQQueue; -import org.apache.qpid.client.AMQSession; -import org.apache.qpid.exchange.ExchangeDefaults; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.test.utils.JMXTestUtils; -import org.apache.qpid.test.utils.QpidBrokerTestCase; - -import javax.jms.Connection; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.TextMessage; - -/** - * Test generation of message statistics. - */ -public abstract class MessageStatisticsTestCase extends QpidBrokerTestCase -{ - protected static final String USER = "admin"; - - protected JMXTestUtils _jmxUtils; - protected Connection _test, _dev, _local; - protected String _queueName = "statistics"; - protected Destination _queue; - protected String _brokerUrl; - - @Override - public void setUp() throws Exception - { - _jmxUtils = new JMXTestUtils(this, USER, USER); - _jmxUtils.setUp(); - - configureStatistics(); - - super.setUp(); - - _brokerUrl = getBroker().toString(); - _test = new AMQConnection(_brokerUrl, USER, USER, "clientid", "test"); - _dev = new AMQConnection(_brokerUrl, USER, USER, "clientid", "development"); - _local = new AMQConnection(_brokerUrl, USER, USER, "clientid", "localhost"); - - _test.start(); - _dev.start(); - _local.start(); - - _jmxUtils.open(); - } - - protected void createQueue(Session session) throws AMQException, JMSException - { - _queue = new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, _queueName); - if (!((AMQSession<?,?>) session).isQueueBound((AMQDestination) _queue)) - { - ((AMQSession<?,?>) session).createQueue(new AMQShortString(_queueName), false, true, false, null); - ((AMQSession<?,?>) session).declareAndBind((AMQDestination) new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, _queueName)); - } - } - - - @Override - public void tearDown() throws Exception - { - _jmxUtils.close(); - - _test.close(); - _dev.close(); - _local.close(); - - super.tearDown(); - } - - /** - * Configure statistics generation properties on the broker. - */ - public abstract void configureStatistics() throws Exception; - - protected void sendUsing(Connection con, int number, int size) throws Exception - { - Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); - createQueue(session); - MessageProducer producer = session.createProducer(_queue); - String content = new String(new byte[size]); - TextMessage msg = session.createTextMessage(content); - for (int i = 0; i < number; i++) - { - producer.send(msg); - } - } - - /** - * Asserts that the actual value is within the expected value plus or - * minus the given error. - */ - public void assertApprox(String message, double error, double expected, double actual) - { - double min = expected * (1.0d - error); - double max = expected * (1.0d + error); - String assertion = String.format("%s: expected %f +/- %d%%, actual %f", - message, expected, (int) (error * 100.0d), actual); - assertTrue(assertion, actual > min && actual < max); - } -} diff --git a/java/systests/src/main/java/org/apache/qpid/ra/QpidRAXAResourceTest.java b/java/systests/src/main/java/org/apache/qpid/ra/QpidRAXAResourceTest.java new file mode 100644 index 0000000000..c8116d8cef --- /dev/null +++ b/java/systests/src/main/java/org/apache/qpid/ra/QpidRAXAResourceTest.java @@ -0,0 +1,67 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.ra; + +import org.apache.qpid.test.utils.QpidBrokerTestCase; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Message; +import javax.jms.Session; + +import javax.jms.XAConnection; +import javax.jms.XAConnectionFactory; +import javax.jms.XASession; +import javax.transaction.xa.XAException; +import javax.transaction.xa.XAResource; + +import org.apache.qpid.client.AMQXAResource; + +import org.apache.qpid.ra.QpidRAConnectionFactoryImpl; +import org.apache.qpid.ra.QpidRAManagedConnectionFactory; +import org.apache.qpid.ra.QpidResourceAdapter; + +public class QpidRAXAResourceTest extends QpidBrokerTestCase +{ + private static final String FACTORY_NAME = "default"; + private static final String BROKER_PORT = "15672"; + private static final String URL = "amqp://guest:guest@client/test?brokerlist='tcp://localhost:" + BROKER_PORT + "?sasl_mechs='PLAIN''"; + + public void testXAResourceIsSameRM() throws Exception + { + QpidResourceAdapter ra = new QpidResourceAdapter(); + QpidRAManagedConnectionFactory mcf = new QpidRAManagedConnectionFactory(); + mcf.setConnectionURL(URL); + mcf.setResourceAdapter(ra); + QpidRAManagedConnection mc = (QpidRAManagedConnection)mcf.createManagedConnection(null, null); + AMQXAResource xa1 = (AMQXAResource)mc.getXAResource(); + + XAConnectionFactory factory = getConnectionFactory(FACTORY_NAME); + XAConnection connection = factory.createXAConnection("guest", "guest"); + XASession s2 = connection.createXASession(); + AMQXAResource xaResource = (AMQXAResource)connection.createXASession().getXAResource(); + + assertTrue("QpidRAXAResource and XAResource should be from the same RM", xa1.isSameRM(xaResource)); + assertTrue("XAResource and QpidRAXAResource should be from the same RM", xaResource.isSameRM(xa1)); + + } + +} diff --git a/java/systests/src/main/java/org/apache/qpid/server/SupportedProtocolVersionsTest.java b/java/systests/src/main/java/org/apache/qpid/server/SupportedProtocolVersionsTest.java index 0e3a658e32..e8d72c13bd 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/SupportedProtocolVersionsTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/SupportedProtocolVersionsTest.java @@ -38,6 +38,13 @@ public class SupportedProtocolVersionsTest extends QpidBrokerTestCase // No-op, we call super.setUp() from test methods after appropriate config overrides } + private void clearProtocolSupportManipulations() + { + //Remove the QBTC provided protocol manipulations, giving only the protocols which default to enabled + setTestSystemProperty(QpidBrokerTestCase.BROKER_PROTOCOL_EXCLUDES, null); + setTestSystemProperty(QpidBrokerTestCase.BROKER_PROTOCOL_INCLUDES, null); + } + /** * Test that 0-10, 0-9-1, 0-9, and 0-8 support is present when no * attempt has yet been made to disable them, and forcing the client @@ -46,7 +53,8 @@ public class SupportedProtocolVersionsTest extends QpidBrokerTestCase */ public void testDefaultProtocolSupport() throws Exception { - //Start the broker without modifying its supported protocols + clearProtocolSupportManipulations(); + super.setUp(); //Verify requesting a 0-10 connection works @@ -74,11 +82,13 @@ public class SupportedProtocolVersionsTest extends QpidBrokerTestCase connection.close(); } - public void testDisabling010() throws Exception + public void testDisabling010and10() throws Exception { - //disable 0-10 support - setConfigurationProperty("connector.amqp10enabled", "false"); - setConfigurationProperty("connector.amqp010enabled", "false"); + clearProtocolSupportManipulations(); + + //disable 0-10 and 1-0 support + setConfigurationProperty(ServerConfiguration.CONNECTOR_AMQP010ENABLED, "false"); + setConfigurationProperty(ServerConfiguration.CONNECTOR_AMQP10ENABLED, "false"); super.setUp(); @@ -90,9 +100,11 @@ public class SupportedProtocolVersionsTest extends QpidBrokerTestCase connection.close(); } - public void testDisabling091and010() throws Exception + public void testDisabling091and010and10() throws Exception { - //disable 0-91 and 0-10 support + clearProtocolSupportManipulations(); + + //disable 0-91 and 0-10 and 1-0 support setConfigurationProperty(ServerConfiguration.CONNECTOR_AMQP10ENABLED, "false"); setConfigurationProperty(ServerConfiguration.CONNECTOR_AMQP010ENABLED, "false"); setConfigurationProperty(ServerConfiguration.CONNECTOR_AMQP091ENABLED, "false"); @@ -107,9 +119,11 @@ public class SupportedProtocolVersionsTest extends QpidBrokerTestCase connection.close(); } - public void testDisabling09and091and010() throws Exception + public void testDisabling09and091and010and10() throws Exception { - //disable 0-9, 0-91 and 0-10 support + clearProtocolSupportManipulations(); + + //disable 0-9, 0-91, 0-10 and 1-0 support setConfigurationProperty(ServerConfiguration.CONNECTOR_AMQP09ENABLED, "false"); setConfigurationProperty(ServerConfiguration.CONNECTOR_AMQP091ENABLED, "false"); setConfigurationProperty(ServerConfiguration.CONNECTOR_AMQP010ENABLED, "false"); @@ -127,6 +141,8 @@ public class SupportedProtocolVersionsTest extends QpidBrokerTestCase public void testConfiguringReplyingToUnsupported010ProtocolInitiationWith09insteadOf091() throws Exception { + clearProtocolSupportManipulations(); + //disable 0-10 support, and set the default unsupported protocol initiation reply to 0-9 setConfigurationProperty(ServerConfiguration.CONNECTOR_AMQP010ENABLED, "false"); setConfigurationProperty(ServerConfiguration.CONNECTOR_AMQP_SUPPORTED_REPLY, "v0_9"); @@ -147,4 +163,72 @@ public class SupportedProtocolVersionsTest extends QpidBrokerTestCase assertEquals("Unexpected protocol version in use", ProtocolVersion.v0_91, connection.getProtocolVersion()); connection.close(); } + + public void testProtocolInclusionThroughQBTCSystemPropertiesOverridesProtocolExclusion() throws Exception + { + testProtocolInclusionOverridesProtocolExclusion(false); + } + + public void testProtocolInclusionThroughConfigOverridesProtocolExclusion() throws Exception + { + testProtocolInclusionOverridesProtocolExclusion(true); + } + + private void testProtocolInclusionOverridesProtocolExclusion(boolean useConfig) throws Exception + { + clearProtocolSupportManipulations(); + + //selectively exclude 0-10 and 1-0 on the test port + setTestSystemProperty(QpidBrokerTestCase.BROKER_PROTOCOL_EXCLUDES,"--exclude-0-10 @PORT --exclude-1-0 @PORT"); + + super.setUp(); + + //Verify initially requesting a 0-10 connection negotiates a 0-9-1 connection + setTestClientSystemProperty(ClientProperties.AMQP_VERSION, "0-10"); + AMQConnection connection = (AMQConnection) getConnection(); + assertEquals("Unexpected protocol version in use", ProtocolVersion.v0_91, connection.getProtocolVersion()); + connection.close(); + + stopBroker(); + + if(useConfig) + { + //selectively include 0-10 support again on the test port through config + setConfigurationProperty(ServerConfiguration.CONNECTOR_INCLUDE_010, String.valueOf(getPort())); + } + else + { + //selectively include 0-10 support again on the test port through QBTC sys props + setTestSystemProperty(QpidBrokerTestCase.BROKER_PROTOCOL_INCLUDES,"--include-0-10 @PORT"); + } + + startBroker(); + + //Verify requesting a 0-10 connection now returns one + setTestClientSystemProperty(ClientProperties.AMQP_VERSION, "0-10"); + connection = (AMQConnection) getConnection(); + assertEquals("Unexpected protocol version in use", ProtocolVersion.v0_10, connection.getProtocolVersion()); + connection.close(); + } + + public void testProtocolInclusionOverridesProtocolDisabling() throws Exception + { + clearProtocolSupportManipulations(); + + //disable 0-10 and 1-0 + setConfigurationProperty(ServerConfiguration.CONNECTOR_AMQP010ENABLED, "false"); + setConfigurationProperty(ServerConfiguration.CONNECTOR_AMQP10ENABLED, "false"); + + //selectively include 0-10 support again on the test port + setConfigurationProperty(ServerConfiguration.CONNECTOR_INCLUDE_010, String.valueOf(getPort())); + + super.setUp(); + + //Verify initially requesting a 0-10 connection still works + setTestClientSystemProperty(ClientProperties.AMQP_VERSION, "0-10"); + AMQConnection connection = (AMQConnection) getConnection(); + assertEquals("Unexpected protocol version in use", ProtocolVersion.v0_10, connection.getProtocolVersion()); + connection.close(); + } + }
\ No newline at end of file diff --git a/java/systests/src/main/java/org/apache/qpid/server/logging/ManagementLoggingTest.java b/java/systests/src/main/java/org/apache/qpid/server/logging/ManagementLoggingTest.java deleted file mode 100644 index c8a6d02761..0000000000 --- a/java/systests/src/main/java/org/apache/qpid/server/logging/ManagementLoggingTest.java +++ /dev/null @@ -1,316 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.logging; - - -import org.apache.qpid.server.configuration.ServerConfiguration; -import org.apache.qpid.test.utils.JMXTestUtils; -import org.apache.qpid.util.LogMonitor; - -import java.io.File; -import java.util.List; - -/** - * Management Console Test Suite - * - * The Management Console test suite validates that the follow log messages as specified in the Functional Specification. - * - * This suite of tests validate that the management console messages occur correctly and according to the following format: - * - * MNG-1001 : Startup - * MNG-1002 : Starting : <service> : Listening on port <Port> - * MNG-1003 : Shutting down : <service> : port <Port> - * MNG-1004 : Ready - * MNG-1005 : Stopped - * MNG-1006 : Using SSL Keystore : <path> - * MNG-1007 : Open : User <username> - * MNG-1008 : Close : User <username> - */ -public class ManagementLoggingTest extends AbstractTestLogging -{ - private static final String MNG_PREFIX = "MNG-"; - - public void setUp() throws Exception - { - setLogMessagePrefix(); - - // We either do this here or have a null check in tearDown. - // As when this test is run against profiles other than java it will NPE - _monitor = new LogMonitor(_outputFile); - //We explicitly do not call super.setUp as starting up the broker is - //part of the test case. - - } - - /** - * Description: - * Using the startup configuration validate that the management startup - * message is logged correctly. - * Input: - * Standard configuration with management enabled - * Output: - * - * <date> MNG-1001 : Startup - * - * Constraints: - * This is the FIRST message logged by MNG - * Validation Steps: - * - * 1. The BRK ID is correct - * 2. This is the FIRST message logged by MNG - */ - public void testManagementStartupEnabled() throws Exception - { - // This test only works on java brokers - if (isJavaBroker()) - { - startBrokerAndCreateMonitor(true, false); - - // Ensure we have received the MNG log msg. - waitForMessage("MNG-1001"); - - List<String> results = findMatches(MNG_PREFIX); - // Validation - - assertTrue("MNGer message not logged", results.size() > 0); - - String log = getLogMessage(results, 0); - - //1 - validateMessageID("MNG-1001", log); - - //2 - //There will be 2 copies of the startup message (one via SystemOut, and one via Log4J) - results = findMatches("MNG-1001"); - assertEquals("Unexpected startup message count.", - 2, results.size()); - - //3 - assertEquals("Startup log message is not 'Startup'.", "Startup", - getMessageString(log)); - } - } - - /** - * Description: - * Verify that when management is disabled in the configuration file the - * startup message is not logged. - * Input: - * Standard configuration with management disabled - * Output: - * NO MNG messages - * Validation Steps: - * - * 1. Validate that no MNG messages are produced. - */ - public void testManagementStartupDisabled() throws Exception - { - if (isJavaBroker()) - { - startBrokerAndCreateMonitor(false, false); - - List<String> results = findMatches(MNG_PREFIX); - // Validation - - assertEquals("MNGer messages logged", 0, results.size()); - } - } - - /** - * The two MNG-1002 messages are logged at the same time so lets test them - * at the same time. - * - * Description: - * Using the default configuration validate that the RMI Registry socket is - * correctly reported as being opened - * - * Input: - * The default configuration file - * Output: - * - * <date> MESSAGE MNG-1002 : Starting : RMI Registry : Listening on port 8999 - * - * Constraints: - * The RMI ConnectorServer and Registry log messages do not have a prescribed order - * Validation Steps: - * - * 1. The MNG ID is correct - * 2. The specified port is the correct '8999' - * - * Description: - * Using the default configuration validate that the RMI ConnectorServer - * socket is correctly reported as being opened - * - * Input: - * The default configuration file - * Output: - * - * <date> MESSAGE MNG-1002 : Starting : RMI ConnectorServer : Listening on port 9099 - * - * Constraints: - * The RMI ConnectorServer and Registry log messages do not have a prescribed order - * Validation Steps: - * - * 1. The MNG ID is correct - * 2. The specified port is the correct '9099' - */ - public void testManagementStartupRMIEntries() throws Exception - { - if (isJavaBroker()) - { - startBrokerAndCreateMonitor(true, false); - - List<String> results = waitAndFindMatches("MNG-1002"); - // Validation - - //There will be 4 startup messages (two via SystemOut, and two via Log4J) - assertEquals("Unexpected MNG-1002 message count", 4, results.size()); - - String log = getLogMessage(results, 0); - - //1 - validateMessageID("MNG-1002", log); - - //Check the RMI Registry port is as expected - int mPort = getManagementPort(getPort()); - assertTrue("RMI Registry port not as expected(" + mPort + ").:" + getMessageString(log), - getMessageString(log).endsWith(String.valueOf(mPort))); - - log = getLogMessage(results, 2); - - //1 - validateMessageID("MNG-1002", log); - - // We expect the RMI Registry port (the defined 'management port') to be - // 100 lower than the JMX RMIConnector Server Port (the actual JMX server) - int jmxPort = mPort + ServerConfiguration.JMXPORT_CONNECTORSERVER_OFFSET; - assertTrue("JMX RMIConnectorServer port not as expected(" + jmxPort + ").:" + getMessageString(log), - getMessageString(log).endsWith(String.valueOf(jmxPort))); - } - } - - /** - * Description: - * Using the default configuration with SSL enabled for the management port the SSL Keystore path should be reported via MNG-1006 - * Input: - * Management SSL enabled default configuration. - * Output: - * - * <date> MESSAGE MNG-1006 : Using SSL Keystore : test_resources/ssl/keystore.jks - * - * Validation Steps: - * - * 1. The MNG ID is correct - * 2. The keystore path is as specified in the configuration - */ - public void testManagementStartupSSLKeystore() throws Exception - { - if (isJavaBroker()) - { - startBrokerAndCreateMonitor(true, true); - - List<String> results = waitAndFindMatches("MNG-1006"); - - assertTrue("MNGer message not logged", results.size() > 0); - - String log = getLogMessage(results, 0); - - //1 - validateMessageID("MNG-1006", log); - - // Validate we only have two MNG-1002 (one via stdout, one via log4j) - results = findMatches("MNG-1006"); - assertEquals("Upexpected SSL Keystore message count", - 2, results.size()); - - // Validate the keystore path is as expected - assertTrue("SSL Keystore entry expected.:" + getMessageString(log), - getMessageString(log).endsWith(new File(getConfigurationStringProperty("management.ssl.keyStorePath")).getName())); - } - } - - /** - * Description: Tests the management connection open/close are logged correctly. - * - * Output: - * - * <date> MESSAGE MNG-1007 : Open : User <username> - * <date> MESSAGE MNG-1008 : Close : User <username> - * - * Validation Steps: - * - * 1. The MNG ID is correct - * 2. The message and username are correct - */ - public void testManagementUserOpenClose() throws Exception - { - if (isJavaBroker()) - { - startBrokerAndCreateMonitor(true, false); - - final JMXTestUtils jmxUtils = new JMXTestUtils(this); - List<String> openResults = null; - List<String> closeResults = null; - try - { - jmxUtils.setUp(); - jmxUtils.open(); - openResults = waitAndFindMatches("MNG-1007"); - } - finally - { - if (jmxUtils != null) - { - jmxUtils.close(); - closeResults = waitAndFindMatches("MNG-1008"); - } - } - - assertNotNull("Management Open results null", openResults.size()); - assertEquals("Management Open logged unexpected number of times", 1, openResults.size()); - - assertNotNull("Management Close results null", closeResults.size()); - assertEquals("Management Close logged unexpected number of times", 1, closeResults.size()); - - final String openMessage = getMessageString(getLogMessage(openResults, 0)); - assertTrue("Unexpected open message " + openMessage, openMessage.endsWith("Open : User admin")); - final String closeMessage = getMessageString(getLogMessage(closeResults, 0)); - assertTrue("Unexpected close message " + closeMessage, closeMessage.endsWith("Close : User admin")); - } - } - - private void startBrokerAndCreateMonitor(boolean managementEnabled, boolean useManagementSSL) throws Exception - { - //Ensure management is on - setConfigurationProperty("management.enabled", String.valueOf(managementEnabled)); - - if(useManagementSSL) - { - // This test requires we have an ssl connection - setConfigurationProperty("management.ssl.enabled", "true"); - } - - startBroker(); - - // Now we can create the monitor as _outputFile will now be defined - _monitor = new LogMonitor(_outputFile); - } -} diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/ConflationQueueTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/ConflationQueueTest.java index ae7be6f7f4..0e59e9cceb 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/queue/ConflationQueueTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/queue/ConflationQueueTest.java @@ -22,7 +22,8 @@ package org.apache.qpid.server.queue; import org.apache.log4j.Logger; - +import org.apache.qpid.AMQException; +import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQSession; @@ -37,82 +38,65 @@ import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; + import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; public class ConflationQueueTest extends QpidBrokerTestCase { - private static final int TIMEOUT = 1500; - - - private static final Logger _logger = Logger.getLogger(ConflationQueueTest.class); + private static final Logger LOGGER = Logger.getLogger(ConflationQueueTest.class); - - - protected final String VHOST = "/test"; - protected final String QUEUE = "ConflationQueue"; + private static final String MESSAGE_SEQUENCE_NUMBER_PROPERTY = "msg"; + private static final String KEY_PROPERTY = "key"; private static final int MSG_COUNT = 400; - private Connection producerConnection; - private MessageProducer producer; - private Session producerSession; - private Queue queue; - private Connection consumerConnection; - private Session consumerSession; - - - private MessageConsumer consumer; + private String _queueName; + private Queue _queue; + private Connection _producerConnection; + private MessageProducer _producer; + private Session _producerSession; + private Connection _consumerConnection; + private Session _consumerSession; + private MessageConsumer _consumer; protected void setUp() throws Exception { super.setUp(); - producerConnection = getConnection(); - producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - producerConnection.start(); - - - } - - protected void tearDown() throws Exception - { - producerConnection.close(); - consumerConnection.close(); - super.tearDown(); + _queueName = getTestQueueName(); + _producerConnection = getConnection(); + _producerSession = _producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); } public void testConflation() throws Exception { - consumerConnection = getConnection(); - consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - + _consumerConnection = getConnection(); + _consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - final Map<String,Object> arguments = new HashMap<String, Object>(); - arguments.put("qpid.last_value_queue_key","key"); - ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), false, true, false, arguments); - queue = new org.apache.qpid.client.AMQQueue("amq.direct",QUEUE); - ((AMQSession) producerSession).declareAndBind((AMQDestination)queue); - producer = producerSession.createProducer(queue); + createConflationQueue(_producerSession); + _producer = _producerSession.createProducer(_queue); for (int msg = 0; msg < MSG_COUNT; msg++) { - producer.send(nextMessage(msg, producerSession)); + _producer.send(nextMessage(msg, _producerSession)); } - producer.close(); - producerSession.close(); - producerConnection.close(); + _producer.close(); + _producerSession.close(); + _producerConnection.close(); - consumer = consumerSession.createConsumer(queue); - consumerConnection.start(); + _consumer = _consumerSession.createConsumer(_queue); + _consumerConnection.start(); Message received; List<Message> messages = new ArrayList<Message>(); - while((received = consumer.receive(1000))!=null) + while((received = _consumer.receive(1000))!=null) { messages.add(received); } @@ -122,40 +106,33 @@ public class ConflationQueueTest extends QpidBrokerTestCase for(int i = 0 ; i < 10; i++) { Message msg = messages.get(i); - assertEquals("Unexpected message number received", MSG_COUNT - 10 + i, msg.getIntProperty("msg")); + assertEquals("Unexpected message number received", MSG_COUNT - 10 + i, msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY)); } - - } - public void testConflationWithRelease() throws Exception { - consumerConnection = getConnection(); - consumerSession = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + _consumerConnection = getConnection(); + _consumerSession = _consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE); - final Map<String,Object> arguments = new HashMap<String, Object>(); - arguments.put("qpid.last_value_queue_key","key"); - ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), false, true, false, arguments); - queue = new org.apache.qpid.client.AMQQueue("amq.direct",QUEUE); - ((AMQSession) producerSession).declareAndBind((AMQDestination)queue); - producer = producerSession.createProducer(queue); + createConflationQueue(_producerSession); + _producer = _producerSession.createProducer(_queue); for (int msg = 0; msg < MSG_COUNT/2; msg++) { - producer.send(nextMessage(msg, producerSession)); + _producer.send(nextMessage(msg, _producerSession)); } // HACK to do something synchronous - ((AMQSession)producerSession).sync(); + ((AMQSession<?,?>)_producerSession).sync(); - consumer = consumerSession.createConsumer(queue); - consumerConnection.start(); + _consumer = _consumerSession.createConsumer(_queue); + _consumerConnection.start(); Message received; List<Message> messages = new ArrayList<Message>(); - while((received = consumer.receive(1000))!=null) + while((received = _consumer.receive(1000))!=null) { messages.add(received); } @@ -165,31 +142,31 @@ public class ConflationQueueTest extends QpidBrokerTestCase for(int i = 0 ; i < 10; i++) { Message msg = messages.get(i); - assertEquals("Unexpected message number received", MSG_COUNT/2 - 10 + i, msg.getIntProperty("msg")); + assertEquals("Unexpected message number received", MSG_COUNT/2 - 10 + i, msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY)); } - consumerSession.close(); - consumerConnection.close(); + _consumerSession.close(); + _consumerConnection.close(); - consumerConnection = getConnection(); - consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + _consumerConnection = getConnection(); + _consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); for (int msg = MSG_COUNT/2; msg < MSG_COUNT; msg++) { - producer.send(nextMessage(msg, producerSession)); + _producer.send(nextMessage(msg, _producerSession)); } // HACK to do something synchronous - ((AMQSession)producerSession).sync(); + ((AMQSession<?,?>)_producerSession).sync(); - consumer = consumerSession.createConsumer(queue); - consumerConnection.start(); + _consumer = _consumerSession.createConsumer(_queue); + _consumerConnection.start(); messages = new ArrayList<Message>(); - while((received = consumer.receive(1000))!=null) + while((received = _consumer.receive(1000))!=null) { messages.add(received); } @@ -199,39 +176,34 @@ public class ConflationQueueTest extends QpidBrokerTestCase for(int i = 0 ; i < 10; i++) { Message msg = messages.get(i); - assertEquals("Unexpected message number received", MSG_COUNT - 10 + i, msg.getIntProperty("msg")); + assertEquals("Unexpected message number received", MSG_COUNT - 10 + i, msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY)); } } - public void testConflationWithReleaseAfterNewPublish() throws Exception { - consumerConnection = getConnection(); - consumerSession = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + _consumerConnection = getConnection(); + _consumerSession = _consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE); - final Map<String,Object> arguments = new HashMap<String, Object>(); - arguments.put("qpid.last_value_queue_key","key"); - ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), false, true, false, arguments); - queue = new org.apache.qpid.client.AMQQueue("amq.direct",QUEUE); - ((AMQSession) producerSession).declareAndBind((AMQDestination)queue); - producer = producerSession.createProducer(queue); + createConflationQueue(_producerSession); + _producer = _producerSession.createProducer(_queue); for (int msg = 0; msg < MSG_COUNT/2; msg++) { - producer.send(nextMessage(msg, producerSession)); + _producer.send(nextMessage(msg, _producerSession)); } // HACK to do something synchronous - ((AMQSession)producerSession).sync(); + ((AMQSession<?,?>)_producerSession).sync(); - consumer = consumerSession.createConsumer(queue); - consumerConnection.start(); + _consumer = _consumerSession.createConsumer(_queue); + _consumerConnection.start(); Message received; List<Message> messages = new ArrayList<Message>(); - while((received = consumer.receive(1000))!=null) + while((received = _consumer.receive(1000))!=null) { messages.add(received); } @@ -241,35 +213,35 @@ public class ConflationQueueTest extends QpidBrokerTestCase for(int i = 0 ; i < 10; i++) { Message msg = messages.get(i); - assertEquals("Unexpected message number received", MSG_COUNT/2 - 10 + i, msg.getIntProperty("msg")); + assertEquals("Unexpected message number received", MSG_COUNT/2 - 10 + i, msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY)); } - consumer.close(); + _consumer.close(); for (int msg = MSG_COUNT/2; msg < MSG_COUNT; msg++) { - producer.send(nextMessage(msg, producerSession)); + _producer.send(nextMessage(msg, _producerSession)); } // HACK to do something synchronous - ((AMQSession)producerSession).sync(); + ((AMQSession<?,?>)_producerSession).sync(); // this causes the "old" messages to be released - consumerSession.close(); - consumerConnection.close(); + _consumerSession.close(); + _consumerConnection.close(); - consumerConnection = getConnection(); - consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + _consumerConnection = getConnection(); + _consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - consumer = consumerSession.createConsumer(queue); - consumerConnection.start(); + _consumer = _consumerSession.createConsumer(_queue); + _consumerConnection.start(); messages = new ArrayList<Message>(); - while((received = consumer.receive(1000))!=null) + while((received = _consumer.receive(1000))!=null) { messages.add(received); } @@ -279,40 +251,54 @@ public class ConflationQueueTest extends QpidBrokerTestCase for(int i = 0 ; i < 10; i++) { Message msg = messages.get(i); - assertEquals("Unexpected message number received", MSG_COUNT - 10 + i, msg.getIntProperty("msg")); + assertEquals("Unexpected message number received", MSG_COUNT - 10 + i, msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY)); + } + + } + + public void testConflatedQueueDepth() throws Exception + { + _consumerConnection = getConnection(); + _consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + createConflationQueue(_producerSession); + _producer = _producerSession.createProducer(_queue); + + for (int msg = 0; msg < MSG_COUNT; msg++) + { + _producer.send(nextMessage(msg, _producerSession)); } + final long queueDepth = ((AMQSession<?, ?>)_producerSession).getQueueDepth((AMQDestination)_queue, true); + + assertEquals(10, queueDepth); } public void testConflationBrowser() throws Exception { - consumerConnection = getConnection(); - consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + _consumerConnection = getConnection(); + _consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - final Map<String,Object> arguments = new HashMap<String, Object>(); - arguments.put("qpid.last_value_queue_key","key"); - ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), false, true, false, arguments); - queue = new org.apache.qpid.client.AMQQueue("amq.direct",QUEUE); - ((AMQSession) producerSession).declareAndBind((AMQDestination)queue); - producer = producerSession.createProducer(queue); + createConflationQueue(_producerSession); + _producer = _producerSession.createProducer(_queue); for (int msg = 0; msg < MSG_COUNT; msg++) { - producer.send(nextMessage(msg, producerSession)); + _producer.send(nextMessage(msg, _producerSession)); } - ((AMQSession)producerSession).sync(); + ((AMQSession<?,?>)_producerSession).sync(); - AMQBindingURL url = new AMQBindingURL("direct://amq.direct//"+QUEUE+"?browse='true'&durable='true'"); + AMQBindingURL url = new AMQBindingURL("direct://amq.direct//"+_queueName+"?browse='true'&durable='true'"); AMQQueue browseQueue = new AMQQueue(url); - consumer = consumerSession.createConsumer(browseQueue); - consumerConnection.start(); + _consumer = _consumerSession.createConsumer(browseQueue); + _consumerConnection.start(); Message received; List<Message> messages = new ArrayList<Message>(); - while((received = consumer.receive(1000))!=null) + while((received = _consumer.receive(1000))!=null) { messages.add(received); } @@ -322,62 +308,53 @@ public class ConflationQueueTest extends QpidBrokerTestCase for(int i = 0 ; i < 10; i++) { Message msg = messages.get(i); - assertEquals("Unexpected message number received", MSG_COUNT - 10 + i, msg.getIntProperty("msg")); + assertEquals("Unexpected message number received", MSG_COUNT - 10 + i, msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY)); } messages.clear(); - producer.send(nextMessage(MSG_COUNT, producerSession)); + _producer.send(nextMessage(MSG_COUNT, _producerSession)); - ((AMQSession)producerSession).sync(); + ((AMQSession<?,?>)_producerSession).sync(); - while((received = consumer.receive(1000))!=null) + while((received = _consumer.receive(1000))!=null) { messages.add(received); } assertEquals("Unexpected number of messages received",1,messages.size()); - assertEquals("Unexpected message number received", MSG_COUNT, messages.get(0).getIntProperty("msg")); - - - producer.close(); - producerSession.close(); - producerConnection.close(); - + assertEquals("Unexpected message number received", MSG_COUNT, messages.get(0).getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY)); + _producer.close(); + _producerSession.close(); + _producerConnection.close(); } - public void testConflation2Browsers() throws Exception { - consumerConnection = getConnection(); - consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + _consumerConnection = getConnection(); + _consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - final Map<String,Object> arguments = new HashMap<String, Object>(); - arguments.put("qpid.last_value_queue_key","key"); - ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), false, true, false, arguments); - queue = new org.apache.qpid.client.AMQQueue("amq.direct",QUEUE); - ((AMQSession) producerSession).declareAndBind((AMQDestination)queue); - producer = producerSession.createProducer(queue); + createConflationQueue(_producerSession); + _producer = _producerSession.createProducer(_queue); for (int msg = 0; msg < MSG_COUNT; msg++) { - producer.send(nextMessage(msg, producerSession)); - + _producer.send(nextMessage(msg, _producerSession)); } - ((AMQSession)producerSession).sync(); + ((AMQSession<?,?>)_producerSession).sync(); - AMQBindingURL url = new AMQBindingURL("direct://amq.direct//"+QUEUE+"?browse='true'&durable='true'"); + AMQBindingURL url = new AMQBindingURL("direct://amq.direct//"+_queueName+"?browse='true'&durable='true'"); AMQQueue browseQueue = new AMQQueue(url); - consumer = consumerSession.createConsumer(browseQueue); - MessageConsumer consumer2 = consumerSession.createConsumer(browseQueue); - consumerConnection.start(); + _consumer = _consumerSession.createConsumer(browseQueue); + MessageConsumer consumer2 = _consumerSession.createConsumer(browseQueue); + _consumerConnection.start(); List<Message> messages = new ArrayList<Message>(); List<Message> messages2 = new ArrayList<Message>(); - Message received = consumer.receive(1000); + Message received = _consumer.receive(1000); Message received2 = consumer2.receive(1000); while(received!=null || received2!=null) @@ -392,7 +369,7 @@ public class ConflationQueueTest extends QpidBrokerTestCase } - received = consumer.receive(1000); + received = _consumer.receive(1000); received2 = consumer2.receive(1000); } @@ -403,33 +380,195 @@ public class ConflationQueueTest extends QpidBrokerTestCase for(int i = 0 ; i < 10; i++) { Message msg = messages.get(i); - assertEquals("Unexpected message number received on first browser", MSG_COUNT - 10 + i, msg.getIntProperty("msg")); + assertEquals("Unexpected message number received on first browser", MSG_COUNT - 10 + i, msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY)); msg = messages2.get(i); - assertEquals("Unexpected message number received on second browser", MSG_COUNT - 10 + i, msg.getIntProperty("msg")); + assertEquals("Unexpected message number received on second browser", MSG_COUNT - 10 + i, msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY)); } - producer.close(); - producerSession.close(); - producerConnection.close(); + _producer.close(); + _producerSession.close(); + _producerConnection.close(); + } + + public void testParallelProductionAndConsumption() throws Exception + { + createConflationQueue(_producerSession); + + // Start producing threads that send messages + BackgroundMessageProducer messageProducer1 = new BackgroundMessageProducer("Message sender1"); + messageProducer1.startSendingMessages(); + BackgroundMessageProducer messageProducer2 = new BackgroundMessageProducer("Message sender2"); + messageProducer2.startSendingMessages(); + + Map<String, Integer> lastReceivedMessages = receiveMessages(messageProducer1); + messageProducer1.join(); + messageProducer2.join(); + final Map<String, Integer> lastSentMessages1 = messageProducer1.getMessageSequenceNumbersByKey(); + assertEquals("Unexpected number of last sent messages sent by producer1", 2, lastSentMessages1.size()); + final Map<String, Integer> lastSentMessages2 = messageProducer2.getMessageSequenceNumbersByKey(); + assertEquals(lastSentMessages1, lastSentMessages2); + assertEquals("The last message sent for each key should match the last message received for that key", + lastSentMessages1, lastReceivedMessages); + + assertNull("Unexpected exception from background producer thread", messageProducer1.getException()); } + private Map<String, Integer> receiveMessages(BackgroundMessageProducer producer) throws Exception + { + producer.waitUntilQuarterOfMessagesSentToEncourageConflation(); + _consumerConnection = getConnection(); + int smallPrefetchToEncourageConflation = 1; + _consumerSession = ((AMQConnection)_consumerConnection).createSession(false, Session.AUTO_ACKNOWLEDGE, smallPrefetchToEncourageConflation); - private Message nextMessage(int msg, Session producerSession) throws JMSException + LOGGER.info("Starting to receive"); + + _consumer = _consumerSession.createConsumer(_queue); + _consumerConnection.start(); + + Map<String, Integer> messageSequenceNumbersByKey = new HashMap<String, Integer>(); + + Message message; + int numberOfShutdownsReceived = 0; + int numberOfMessagesReceived = 0; + while(numberOfShutdownsReceived < 2) + { + message = _consumer.receive(10000); + assertNotNull(message); + + if (message.propertyExists(BackgroundMessageProducer.SHUTDOWN)) + { + numberOfShutdownsReceived++; + } + else + { + numberOfMessagesReceived++; + putMessageInMap(message, messageSequenceNumbersByKey); + } + } + + LOGGER.info("Finished receiving. Received " + numberOfMessagesReceived + " message(s) in total"); + + return messageSequenceNumbersByKey; + } + + private void putMessageInMap(Message message, Map<String, Integer> messageSequenceNumbersByKey) throws JMSException { - Message send = producerSession.createTextMessage("Message: " + msg); + String keyValue = message.getStringProperty(KEY_PROPERTY); + Integer messageSequenceNumber = message.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY); + messageSequenceNumbersByKey.put(keyValue, messageSequenceNumber); + } - send.setStringProperty("key", String.valueOf(msg % 10)); - send.setIntProperty("msg", msg); + private class BackgroundMessageProducer + { + static final String SHUTDOWN = "SHUTDOWN"; - return send; + private final String _threadName; + + private volatile Exception _exception; + + private Thread _thread; + private Map<String, Integer> _messageSequenceNumbersByKey = new HashMap<String, Integer>(); + private CountDownLatch _quarterOfMessagesSentLatch = new CountDownLatch(MSG_COUNT/4); + + public BackgroundMessageProducer(String threadName) + { + _threadName = threadName; + } + + public void waitUntilQuarterOfMessagesSentToEncourageConflation() throws InterruptedException + { + final long latchTimeout = 60000; + boolean success = _quarterOfMessagesSentLatch.await(latchTimeout, TimeUnit.MILLISECONDS); + assertTrue("Failed to be notified that 1/4 of the messages have been sent within " + latchTimeout + " ms.", success); + LOGGER.info("Quarter of messages sent"); + } + + public Exception getException() + { + return _exception; + } + + public Map<String, Integer> getMessageSequenceNumbersByKey() + { + return Collections.unmodifiableMap(_messageSequenceNumbersByKey); + } + + public void startSendingMessages() + { + Runnable messageSender = new Runnable() + { + @Override + public void run() + { + try + { + LOGGER.info("Starting to send in background thread"); + Connection producerConnection = getConnection(); + Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + MessageProducer backgroundProducer = producerSession.createProducer(_queue); + for (int messageNumber = 0; messageNumber < MSG_COUNT; messageNumber++) + { + Message message = nextMessage(messageNumber, producerSession, 2); + backgroundProducer.send(message); + + putMessageInMap(message, _messageSequenceNumbersByKey); + _quarterOfMessagesSentLatch.countDown(); + } + + Message shutdownMessage = producerSession.createMessage(); + shutdownMessage.setBooleanProperty(SHUTDOWN, true); + backgroundProducer.send(shutdownMessage); + + LOGGER.info("Finished sending in background thread"); + } + catch (Exception e) + { + _exception = e; + throw new RuntimeException(e); + } + } + }; + + _thread = new Thread(messageSender); + _thread.setName(_threadName); + _thread.start(); + } + + public void join() throws InterruptedException + { + final int timeoutInMillis = 120000; + _thread.join(timeoutInMillis); + assertFalse("Expected producer thread to finish within " + timeoutInMillis + "ms", _thread.isAlive()); + } + } + + private void createConflationQueue(Session session) throws AMQException + { + final Map<String,Object> arguments = new HashMap<String, Object>(); + arguments.put("qpid.last_value_queue_key",KEY_PROPERTY); + ((AMQSession<?,?>) session).createQueue(new AMQShortString(_queueName), false, true, false, arguments); + _queue = new AMQQueue("amq.direct", _queueName); + ((AMQSession<?,?>) session).declareAndBind((AMQDestination)_queue); } + private Message nextMessage(int msg, Session producerSession) throws JMSException + { + return nextMessage(msg, producerSession, 10); + } -} + private Message nextMessage(int msg, Session producerSession, int numberOfUniqueKeyValues) throws JMSException + { + Message send = producerSession.createTextMessage("Message: " + msg); + send.setStringProperty(KEY_PROPERTY, String.valueOf(msg % numberOfUniqueKeyValues)); + send.setIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY, msg); + return send; + } +} diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/ModelTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/ModelTest.java index ab0d88c737..782709b24f 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/queue/ModelTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/queue/ModelTest.java @@ -333,7 +333,7 @@ public class ModelTest extends QpidBrokerTestCase queueName)); assertEquals(queueName, managedQueue.getName()); - assertEquals(String.valueOf(owner), managedQueue.getOwner()); + assertEquals(owner, managedQueue.getOwner()); assertEquals(durable, managedQueue.isDurable()); assertEquals(autoDelete, managedQueue.isAutoDelete()); } diff --git a/java/systests/src/main/java/org/apache/qpid/server/security/acl/ExternalACLJMXTest.java b/java/systests/src/main/java/org/apache/qpid/server/security/acl/ExternalACLJMXTest.java index 6e4f12b9f3..ceff2b998a 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/security/acl/ExternalACLJMXTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/security/acl/ExternalACLJMXTest.java @@ -19,7 +19,6 @@ package org.apache.qpid.server.security.acl; import org.apache.qpid.management.common.mbeans.ServerInformation; -import org.apache.qpid.server.management.ManagedObject; import org.apache.qpid.server.security.access.ObjectType; import org.apache.qpid.test.utils.JMXTestUtils; @@ -30,7 +29,7 @@ import java.lang.management.RuntimeMXBean; * Tests that access to the JMX interface is governed only by {@link ObjectType#METHOD}/{@link ObjectType#ALL} * rules and AMQP rights have no effect. * - * Ensures that objects outside the Qpid domain ({@link ManagedObject#DOMAIN}) are not governed by the ACL model. + * Ensures that objects outside the Qpid domain are not governed by the ACL model. */ public class ExternalACLJMXTest extends AbstractACLTestCase { diff --git a/java/systests/src/main/java/org/apache/qpid/server/security/auth/manager/MultipleAuthenticationManagersTest.java b/java/systests/src/main/java/org/apache/qpid/server/security/auth/manager/MultipleAuthenticationManagersTest.java new file mode 100644 index 0000000000..858b32c24c --- /dev/null +++ b/java/systests/src/main/java/org/apache/qpid/server/security/auth/manager/MultipleAuthenticationManagersTest.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.server.security.auth.manager; + +import javax.jms.Connection; +import javax.jms.JMSException; +import org.apache.qpid.AMQException; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.test.utils.QpidBrokerTestCase; + +public class MultipleAuthenticationManagersTest extends QpidBrokerTestCase +{ + private static final String KEYSTORE = "test-profiles/test_resources/ssl/java_client_keystore.jks"; + private static final String KEYSTORE_PASSWORD = "password"; + private static final String TRUSTSTORE = "test-profiles/test_resources/ssl/java_client_truststore.jks"; + private static final String TRUSTSTORE_PASSWORD = "password"; + + @Override + protected void setUp() throws Exception + { + setConfigurationProperty("connector.ssl.enabled", "true"); + setConfigurationProperty("connector.ssl.sslOnly", "false"); + setConfigurationProperty("security.anonymous-auth-manager", ""); + setConfigurationProperty("security.default-auth-manager", "PrincipalDatabaseAuthenticationManager"); + setConfigurationProperty("security.port-mappings.port-mapping.port", String.valueOf(QpidBrokerTestCase.DEFAULT_SSL_PORT)); + setConfigurationProperty("security.port-mappings.port-mapping.auth-manager", "AnonymousAuthenticationManager"); + + // set the ssl system properties + setSystemProperty("javax.net.ssl.keyStore", KEYSTORE); + setSystemProperty("javax.net.ssl.keyStorePassword", KEYSTORE_PASSWORD); + setSystemProperty("javax.net.ssl.trustStore", TRUSTSTORE); + setSystemProperty("javax.net.ssl.trustStorePassword", TRUSTSTORE_PASSWORD); + setSystemProperty("javax.net.debug", "ssl"); + super.setUp(); + } + + private Connection getAnonymousSSLConnection() throws Exception + { + String url = "amqp://:@test/?brokerlist='tcp://localhost:%s?ssl='true''"; + + url = String.format(url,QpidBrokerTestCase.DEFAULT_SSL_PORT); + + return new AMQConnection(url); + + } + + private Connection getAnonymousConnection() throws Exception + { + String url = "amqp://:@test/?brokerlist='tcp://localhost:%s'"; + + url = String.format(url,QpidBrokerTestCase.DEFAULT_PORT); + + return new AMQConnection(url); + + } + + + public void testMultipleAuthenticationManagers() throws Exception + { + try + { + Connection conn = getConnection(); + assertNotNull("Connection unexpectedly null", conn); + } + catch(JMSException e) + { + fail("Should be able to create a connection with credentials to the standard port. " + e.getMessage()); + } + + try + { + Connection conn = getAnonymousSSLConnection(); + assertNotNull("Connection unexpectedly null", conn); + } + catch(JMSException e) + { + fail("Should be able to create a anonymous connection to the SSL port. " + e.getMessage()); + } + + try + { + Connection conn = getAnonymousConnection(); + fail("Should not be able to create anonymous connection to the standard port"); + } + catch(AMQException e) + { + // pass + } + + } +} diff --git a/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsReportingTest.java b/java/systests/src/main/java/org/apache/qpid/server/stats/StatisticsReportingTest.java index 786ef11956..c38fcd9199 100644 --- a/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsReportingTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/stats/StatisticsReportingTest.java @@ -18,30 +18,75 @@ * under the License. * */ -package org.apache.qpid.management.jmx; +package org.apache.qpid.server.stats; +import org.apache.qpid.AMQException; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.exchange.ExchangeDefaults; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.test.utils.QpidBrokerTestCase; import org.apache.qpid.util.LogMonitor; import java.util.List; +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; + /** - * Test generation of message statistics reporting. + * Test generation of message/data statistics reporting and the ability + * to control from the configuration file. */ -public class MessageStatisticsReportingTest extends MessageStatisticsTestCase +public class StatisticsReportingTest extends QpidBrokerTestCase { protected LogMonitor _monitor; - - public void configureStatistics() throws Exception + protected static final String USER = "admin"; + + protected Connection _test, _dev, _local; + protected String _queueName = "statistics"; + protected Destination _queue; + protected String _brokerUrl; + + @Override + public void setUp() throws Exception { setConfigurationProperty("statistics.generation.broker", "true"); setConfigurationProperty("statistics.generation.virtualhosts", "true"); - + if (getName().equals("testEnabledStatisticsReporting")) { setConfigurationProperty("statistics.reporting.period", "10"); } - + _monitor = new LogMonitor(_outputFile); + + super.setUp(); + + _brokerUrl = getBroker().toString(); + _test = new AMQConnection(_brokerUrl, USER, USER, "clientid", "test"); + _dev = new AMQConnection(_brokerUrl, USER, USER, "clientid", "development"); + _local = new AMQConnection(_brokerUrl, USER, USER, "clientid", "localhost"); + + _test.start(); + _dev.start(); + _local.start(); + + } + + @Override + public void tearDown() throws Exception + { + _test.close(); + _dev.close(); + _local.close(); + + super.tearDown(); } /** @@ -52,14 +97,14 @@ public class MessageStatisticsReportingTest extends MessageStatisticsTestCase sendUsing(_test, 10, 100); sendUsing(_dev, 20, 100); sendUsing(_local, 15, 100); - + Thread.sleep(10 * 1000); // 15s - + List<String> brokerStatsData = _monitor.findMatches("BRK-1008"); List<String> brokerStatsMessages = _monitor.findMatches("BRK-1009"); List<String> vhostStatsData = _monitor.findMatches("VHT-1003"); List<String> vhostStatsMessages = _monitor.findMatches("VHT-1004"); - + assertEquals("Incorrect number of broker data stats log messages", 2, brokerStatsData.size()); assertEquals("Incorrect number of broker message stats log messages", 2, brokerStatsMessages.size()); assertEquals("Incorrect number of virtualhost data stats log messages", 6, vhostStatsData.size()); @@ -74,17 +119,40 @@ public class MessageStatisticsReportingTest extends MessageStatisticsTestCase sendUsing(_test, 10, 100); sendUsing(_dev, 20, 100); sendUsing(_local, 15, 100); - + Thread.sleep(10 * 1000); // 15s - + List<String> brokerStatsData = _monitor.findMatches("BRK-1008"); List<String> brokerStatsMessages = _monitor.findMatches("BRK-1009"); List<String> vhostStatsData = _monitor.findMatches("VHT-1003"); List<String> vhostStatsMessages = _monitor.findMatches("VHT-1004"); - + assertEquals("Incorrect number of broker data stats log messages", 0, brokerStatsData.size()); assertEquals("Incorrect number of broker message stats log messages", 0, brokerStatsMessages.size()); assertEquals("Incorrect number of virtualhost data stats log messages", 0, vhostStatsData.size()); assertEquals("Incorrect number of virtualhost message stats log messages", 0, vhostStatsMessages.size()); } + + private void sendUsing(Connection con, int number, int size) throws Exception + { + Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + createQueue(session); + MessageProducer producer = session.createProducer(_queue); + String content = new String(new byte[size]); + TextMessage msg = session.createTextMessage(content); + for (int i = 0; i < number; i++) + { + producer.send(msg); + } + } + + private void createQueue(Session session) throws AMQException, JMSException + { + _queue = new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, _queueName); + if (!((AMQSession<?,?>) session).isQueueBound((AMQDestination) _queue)) + { + ((AMQSession<?,?>) session).createQueue(new AMQShortString(_queueName), false, true, false, null); + ((AMQSession<?,?>) session).declareAndBind((AMQDestination) new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, _queueName)); + } + } } diff --git a/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java b/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java new file mode 100644 index 0000000000..07965cfa95 --- /dev/null +++ b/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java @@ -0,0 +1,180 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.commons.configuration.Configuration; +import org.apache.qpid.AMQStoreException; +import org.apache.qpid.server.message.EnqueableMessage; +import org.apache.qpid.server.message.MessageContentSource; + +public class QuotaMessageStore extends NullMessageStore +{ + private final AtomicLong _messageId = new AtomicLong(1); + private final AtomicBoolean _closed = new AtomicBoolean(false); + + private long _totalStoreSize;; + private boolean _limitBusted; + private long _persistentSizeLowThreshold; + private long _persistentSizeHighThreshold; + + private final StateManager _stateManager; + private final EventManager _eventManager = new EventManager(); + + public QuotaMessageStore() + { + _stateManager = new StateManager(_eventManager); + } + + @Override + public void configureConfigStore(String name, ConfigurationRecoveryHandler recoveryHandler, Configuration config) + throws Exception + { + _persistentSizeHighThreshold = config.getLong(MessageStoreConstants.OVERFULL_SIZE_PROPERTY, Long.MAX_VALUE); + _persistentSizeLowThreshold = config.getLong(MessageStoreConstants.UNDERFULL_SIZE_PROPERTY, + _persistentSizeHighThreshold); + if (_persistentSizeLowThreshold > _persistentSizeHighThreshold || _persistentSizeLowThreshold < 0l) + { + _persistentSizeLowThreshold = _persistentSizeHighThreshold; + } + _stateManager.attainState(State.INITIALISING); + } + + @Override + public void configureMessageStore(String name, MessageStoreRecoveryHandler recoveryHandler, + TransactionLogRecoveryHandler tlogRecoveryHandler, Configuration config) throws Exception + { + _stateManager.attainState(State.INITIALISED); + } + + @Override + public void activate() throws Exception + { + _stateManager.attainState(State.ACTIVATING); + _stateManager.attainState(State.ACTIVE); + } + + @SuppressWarnings("unchecked") + @Override + public StoredMessage<StorableMessageMetaData> addMessage(StorableMessageMetaData metaData) + { + final long id = _messageId.getAndIncrement(); + return new StoredMemoryMessage(id, metaData); + } + + @Override + public Transaction newTransaction() + { + return new Transaction() + { + private AtomicLong _storeSizeIncrease = new AtomicLong(); + + @Override + public StoreFuture commitTranAsync() throws AMQStoreException + { + QuotaMessageStore.this.storedSizeChange(_storeSizeIncrease.intValue()); + return StoreFuture.IMMEDIATE_FUTURE; + } + + @Override + public void enqueueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException + { + _storeSizeIncrease.addAndGet(((MessageContentSource)message).getSize()); + } + + @Override + public void dequeueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException + { + _storeSizeIncrease.addAndGet(-((MessageContentSource)message).getSize()); + } + + @Override + public void commitTran() throws AMQStoreException + { + QuotaMessageStore.this.storedSizeChange(_storeSizeIncrease.intValue()); + } + + @Override + public void abortTran() throws AMQStoreException + { + } + + @Override + public void removeXid(long format, byte[] globalId, byte[] branchId) + { + } + + @Override + public void recordXid(long format, byte[] globalId, byte[] branchId, Record[] enqueues, Record[] dequeues) + { + } + }; + } + + @Override + public boolean isPersistent() + { + return true; + } + + @Override + public void close() throws Exception + { + _stateManager.attainState(State.CLOSING); + _closed.getAndSet(true); + _stateManager.attainState(State.CLOSED); + } + + @Override + public void addEventListener(EventListener eventListener, Event... events) + { + _eventManager.addEventListener(eventListener, events); + } + + private void storedSizeChange(final int delta) + { + if(_persistentSizeHighThreshold > 0) + { + synchronized (this) + { + long newSize = _totalStoreSize += delta; + if(!_limitBusted && newSize > _persistentSizeHighThreshold) + { + _limitBusted = true; + _eventManager.notifyEvent(Event.PERSISTENT_MESSAGE_SIZE_OVERFULL); + } + else if(_limitBusted && newSize < _persistentSizeHighThreshold) + { + _limitBusted = false; + _eventManager.notifyEvent(Event.PERSISTENT_MESSAGE_SIZE_UNDERFULL); + } + } + } + } + + @Override + public String getStoreType() + { + return "QUOTA"; + } +} diff --git a/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java b/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java index f2d4a513be..9db04b64b3 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java +++ b/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java @@ -24,7 +24,6 @@ import org.apache.commons.configuration.Configuration; import org.apache.log4j.Logger; import org.apache.qpid.AMQStoreException; -import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.binding.Binding; import org.apache.qpid.server.exchange.Exchange; @@ -370,4 +369,10 @@ public class SlowMessageStore implements MessageStore return _realStore.getStoreLocation(); } + @Override + public String getStoreType() + { + return "SLOW"; + } + } diff --git a/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStoreFactory.java b/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStoreFactory.java deleted file mode 100644 index 6497a640d2..0000000000 --- a/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStoreFactory.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store; - -public class SlowMessageStoreFactory implements MessageStoreFactory -{ - - @Override - public MessageStore createMessageStore() - { - return new SlowMessageStore(); - } - - @Override - public String getStoreClassName() - { - return SlowMessageStore.class.getSimpleName(); - } - -} diff --git a/java/systests/src/main/java/org/apache/qpid/server/store/StoreOverfullTest.java b/java/systests/src/main/java/org/apache/qpid/server/store/StoreOverfullTest.java new file mode 100644 index 0000000000..9fb1db3a4f --- /dev/null +++ b/java/systests/src/main/java/org/apache/qpid/server/store/StoreOverfullTest.java @@ -0,0 +1,343 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; + +import org.apache.qpid.AMQException; +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.exchange.ExchangeDefaults; +import org.apache.qpid.test.utils.QpidBrokerTestCase; + +public class StoreOverfullTest extends QpidBrokerTestCase +{ + /** Number of messages to send*/ + public static final int TEST_SIZE = 15; + + /** Message payload*/ + private static final byte[] BYTE_32K = new byte[32*1024]; + + private Connection _producerConnection; + private Connection _consumerConnection; + private Session _producerSession; + private Session _consumerSession; + private MessageProducer _producer; + private MessageConsumer _consumer; + private Queue _queue; + + private static final int OVERFULL_SIZE = 400000; + private static final int UNDERFULL_SIZE = 350000; + + public void setUp() throws Exception + { + setConfigurationProperty("virtualhosts.virtualhost.test.store.class", QuotaMessageStore.class.getName()); + setConfigurationProperty("virtualhosts.virtualhost.test.store.overfull-size", String.valueOf(OVERFULL_SIZE)); + setConfigurationProperty("virtualhosts.virtualhost.test.store.underfull-size", String.valueOf(UNDERFULL_SIZE)); + + super.setUp(); + + _producerConnection = getConnection(); + _producerSession = _producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + _producerConnection.start(); + + _consumerConnection = getConnection(); + _consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + } + + public void tearDown() throws Exception + { + try + { + _producerConnection.close(); + _consumerConnection.close(); + } + finally + { + super.tearDown(); + } + } + + /** + * Test: + * + * Send > threshold amount of data : Sender is blocked + * Remove 90% of data : Sender is unblocked + * + */ + public void testCapacityExceededCausesBlock() throws Exception + { + AtomicInteger sentMessages = new AtomicInteger(0); + _queue = getTestQueue(); + ((AMQSession<?,?>) _producerSession).declareAndBind((AMQDestination)_queue); + + _producer = _producerSession.createProducer(_queue); + + MessageSender sender = sendMessagesAsync(_producer, _producerSession, TEST_SIZE, 50L, sentMessages); + + while(!((AMQSession<?,?>)_producerSession).isFlowBlocked()) + { + Thread.sleep(100l); + } + int sentCount = sentMessages.get(); + assertFalse("Did not block before sending all messages", TEST_SIZE == sentCount); + + _consumer = _consumerSession.createConsumer(_queue); + _consumerConnection.start(); + + int mostMessages = (int) (0.9 * sentCount); + for(int i = 0; i < mostMessages; i++) + { + if(_consumer.receive(1000l) == null) + { + break; + } + } + + long targetTime = System.currentTimeMillis() + 5000l; + while(sentMessages.get() == sentCount && System.currentTimeMillis() < targetTime) + { + Thread.sleep(100l); + } + + assertFalse("Did not unblock on consuming messages", sentMessages.get() == sentCount); + + for(int i = mostMessages; i < TEST_SIZE; i++) + { + if(_consumer.receive(1000l) == null) + { + break; + } + } + + assertTrue("Not all messages were sent", sentMessages.get() == TEST_SIZE); + assertNull("Unexpected exception on message sending:" + sender.getException(), sender.getException()); + } + + /** + * Two producers on different queues + */ + public void testCapacityExceededCausesBlockTwoConnections() throws Exception + { + AtomicInteger sentMessages = new AtomicInteger(0); + AtomicInteger sentMessages2 = new AtomicInteger(0); + + _queue = getTestQueue(); + AMQQueue queue2 = new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, getTestQueueName() + "_2"); + + ((AMQSession<?,?>) _producerSession).declareAndBind((AMQDestination)_queue); + ((AMQSession<?,?>) _producerSession).declareAndBind((AMQDestination)queue2); + + _producer = _producerSession.createProducer(_queue); + + Connection secondProducerConnection = getConnection(); + Session secondProducerSession = secondProducerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer secondProducer = secondProducerSession.createProducer(queue2); + + sendMessagesAsync(_producer, _producerSession, TEST_SIZE, 50L, sentMessages); + sendMessagesAsync(secondProducer, secondProducerSession, TEST_SIZE, 50L, sentMessages2); + + while(!((AMQSession<?,?>)_producerSession).isFlowBlocked()) + { + Thread.sleep(100l); + } + int sentCount = sentMessages.get(); + assertFalse("Did not block before sending all messages", TEST_SIZE == sentCount); + + + while(!((AMQSession<?,?>)secondProducerSession).isFlowBlocked()) + { + Thread.sleep(100l); + } + int sentCount2 = sentMessages2.get(); + assertFalse("Did not block before sending all messages", TEST_SIZE == sentCount2); + + _consumer = _consumerSession.createConsumer(_queue); + MessageConsumer consumer2 = _consumerSession.createConsumer(queue2); + _consumerConnection.start(); + + for(int i = 0; i < 2*TEST_SIZE; i++) + { + if(_consumer.receive(1000l) == null + && consumer2.receive(1000l) == null) + { + break; + } + } + + assertEquals("Not all messages were sent from the first sender", TEST_SIZE, sentMessages.get()); + assertEquals("Not all messages were sent from the second sender", TEST_SIZE, sentMessages2.get()); + } + + /** + * New producers are blocked + */ + public void testCapacityExceededCausesBlockNewConnection() throws Exception + { + AtomicInteger sentMessages = new AtomicInteger(0); + AtomicInteger sentMessages2 = new AtomicInteger(0); + + _queue = getTestQueue(); + + ((AMQSession<?,?>) _producerSession).declareAndBind((AMQDestination)_queue); + + _producer = _producerSession.createProducer(_queue); + + Connection secondProducerConnection = getConnection(); + Session secondProducerSession = secondProducerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer secondProducer = secondProducerSession.createProducer(_queue); + + sendMessagesAsync(_producer, _producerSession, TEST_SIZE, 50L, sentMessages); + + while(!((AMQSession<?,?>)_producerSession).isFlowBlocked()) + { + Thread.sleep(100l); + } + int sentCount = sentMessages.get(); + assertFalse("Did not block before sending all messages", TEST_SIZE == sentCount); + + sendMessagesAsync(secondProducer, secondProducerSession, TEST_SIZE, 50L, sentMessages2); + + while(!((AMQSession<?,?>)_producerSession).isFlowBlocked()) + { + Thread.sleep(100l); + } + int sentCount2 = sentMessages2.get(); + assertFalse("Did not block before sending all messages", TEST_SIZE == sentCount2); + + _consumer = _consumerSession.createConsumer(_queue); + _consumerConnection.start(); + + for(int i = 0; i < 2*TEST_SIZE; i++) + { + if(_consumer.receive(2000l) == null) + { + break; + } + } + + assertEquals("Not all messages were sent from the first sender", TEST_SIZE, sentMessages.get()); + assertEquals("Not all messages were sent from the second sender", TEST_SIZE, sentMessages2.get()); + + } + + private MessageSender sendMessagesAsync(final MessageProducer producer, + final Session producerSession, + final int numMessages, + long sleepPeriod, + AtomicInteger sentMessages) + { + MessageSender sender = new MessageSender(producer, producerSession, numMessages,sleepPeriod, sentMessages); + new Thread(sender).start(); + return sender; + } + + private class MessageSender implements Runnable + { + private final MessageProducer _senderProducer; + private final Session _senderSession; + private final int _numMessages; + private volatile JMSException _exception; + private CountDownLatch _exceptionThrownLatch = new CountDownLatch(1); + private long _sleepPeriod; + private final AtomicInteger _sentMessages; + + public MessageSender(MessageProducer producer, Session producerSession, int numMessages, long sleepPeriod, AtomicInteger sentMessages) + { + _senderProducer = producer; + _senderSession = producerSession; + _numMessages = numMessages; + _sleepPeriod = sleepPeriod; + _sentMessages = sentMessages; + } + + public void run() + { + try + { + sendMessages(_senderProducer, _senderSession, _numMessages, _sleepPeriod, _sentMessages); + } + catch (JMSException e) + { + _exception = e; + _exceptionThrownLatch.countDown(); + } + } + + public Exception getException() + { + return _exception; + } + + } + + private void sendMessages(MessageProducer producer, Session producerSession, int numMessages, long sleepPeriod, AtomicInteger sentMessages) + throws JMSException + { + + for (int msg = 0; msg < numMessages; msg++) + { + producer.send(nextMessage(msg, producerSession)); + sentMessages.incrementAndGet(); + + try + { + ((AMQSession<?,?>)producerSession).sync(); + } + catch (AMQException e) + { + e.printStackTrace(); + throw new RuntimeException(e); + } + + try + { + Thread.sleep(sleepPeriod); + } + catch (InterruptedException e) + { + throw new RuntimeException(e); + } + } + } + + private Message nextMessage(int msg, Session producerSession) throws JMSException + { + BytesMessage send = producerSession.createBytesMessage(); + send.writeBytes(BYTE_32K); + send.setIntProperty("msg", msg); + return send; + } + +} diff --git a/java/systests/src/main/java/org/apache/qpid/test/client/DupsOkTest.java b/java/systests/src/main/java/org/apache/qpid/test/client/DupsOkTest.java index e06ed6e171..fa36d73283 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/client/DupsOkTest.java +++ b/java/systests/src/main/java/org/apache/qpid/test/client/DupsOkTest.java @@ -1,3 +1,23 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ package org.apache.qpid.test.client; import org.apache.qpid.client.AMQDestination; @@ -16,26 +36,6 @@ import javax.jms.TextMessage; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -/* -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*/ public class DupsOkTest extends QpidBrokerTestCase { diff --git a/java/systests/src/main/java/org/apache/qpid/test/client/message/SelectorTest.java b/java/systests/src/main/java/org/apache/qpid/test/client/message/SelectorTest.java index 2c7f426306..626592dc10 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/client/message/SelectorTest.java +++ b/java/systests/src/main/java/org/apache/qpid/test/client/message/SelectorTest.java @@ -233,7 +233,7 @@ public class SelectorTest extends QpidBrokerTestCase implements MessageListener { } - assertTrue("Connection should be closed", _connection.isClosed()); + assertFalse("Connection should not be closed", _connection.isClosed()); } public void testSelectorWithJMSMessageID() throws Exception diff --git a/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitDelayTest.java b/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitDelayTest.java index 91f5cb7770..ee81e7c372 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitDelayTest.java +++ b/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitDelayTest.java @@ -54,7 +54,7 @@ public class SyncWaitDelayTest extends QpidBrokerTestCase public void setUp() throws Exception { - setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST+".store.factoryclass", "org.apache.qpid.server.store.SlowMessageStoreFactory"); + setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST+".store.class", "org.apache.qpid.server.store.SlowMessageStore"); setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST+".store.delays.commitTran.post", String.valueOf(POST_COMMIT_DELAY)); setConfigurationProperty("management.enabled", "false"); diff --git a/java/systests/src/main/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java b/java/systests/src/main/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java index fa0fe7e0b5..bc1eead8b4 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java +++ b/java/systests/src/main/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java @@ -41,6 +41,7 @@ import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -67,6 +68,9 @@ public class MaxDeliveryCountTest extends QpidBrokerTestCase private static final int MAX_DELIVERY_COUNT = 2; private CountDownLatch _awaitCompletion; + /** index numbers of messages to be redelivered */ + private final List<Integer> _redeliverMsgs = Arrays.asList(1, 2, 5, 14); + public void setUp() throws Exception { //enable DLQ/maximumDeliveryCount support for all queues at the vhost level @@ -144,13 +148,7 @@ public class MaxDeliveryCountTest extends QpidBrokerTestCase */ public void testAsynchronousClientAckSession() throws Exception { - final ArrayList<Integer> redeliverMsgs = new ArrayList<Integer>(); - redeliverMsgs.add(1); - redeliverMsgs.add(2); - redeliverMsgs.add(5); - redeliverMsgs.add(14); - - doTest(Session.CLIENT_ACKNOWLEDGE, redeliverMsgs, false, false); + doTest(Session.CLIENT_ACKNOWLEDGE, _redeliverMsgs, false, false); } /** @@ -159,13 +157,7 @@ public class MaxDeliveryCountTest extends QpidBrokerTestCase */ public void testAsynchronousTransactedSession() throws Exception { - final ArrayList<Integer> redeliverMsgs = new ArrayList<Integer>(); - redeliverMsgs.add(1); - redeliverMsgs.add(2); - redeliverMsgs.add(5); - redeliverMsgs.add(14); - - doTest(Session.SESSION_TRANSACTED, redeliverMsgs, false, false); + doTest(Session.SESSION_TRANSACTED, _redeliverMsgs, false, false); } /** @@ -174,13 +166,7 @@ public class MaxDeliveryCountTest extends QpidBrokerTestCase */ public void testAsynchronousAutoAckSession() throws Exception { - final ArrayList<Integer> redeliverMsgs = new ArrayList<Integer>(); - redeliverMsgs.add(1); - redeliverMsgs.add(2); - redeliverMsgs.add(5); - redeliverMsgs.add(14); - - doTest(Session.AUTO_ACKNOWLEDGE, redeliverMsgs, false, false); + doTest(Session.AUTO_ACKNOWLEDGE, _redeliverMsgs, false, false); } /** @@ -189,13 +175,7 @@ public class MaxDeliveryCountTest extends QpidBrokerTestCase */ public void testAsynchronousDupsOkSession() throws Exception { - final ArrayList<Integer> redeliverMsgs = new ArrayList<Integer>(); - redeliverMsgs.add(1); - redeliverMsgs.add(2); - redeliverMsgs.add(5); - redeliverMsgs.add(14); - - doTest(Session.DUPS_OK_ACKNOWLEDGE, redeliverMsgs, false, false); + doTest(Session.DUPS_OK_ACKNOWLEDGE, _redeliverMsgs, false, false); } /** @@ -204,13 +184,7 @@ public class MaxDeliveryCountTest extends QpidBrokerTestCase */ public void testSynchronousClientAckSession() throws Exception { - final ArrayList<Integer> redeliverMsgs = new ArrayList<Integer>(); - redeliverMsgs.add(1); - redeliverMsgs.add(2); - redeliverMsgs.add(3); - redeliverMsgs.add(14); - - doTest(Session.CLIENT_ACKNOWLEDGE, redeliverMsgs, true, false); + doTest(Session.CLIENT_ACKNOWLEDGE, _redeliverMsgs, true, false); } /** @@ -219,27 +193,22 @@ public class MaxDeliveryCountTest extends QpidBrokerTestCase */ public void testSynchronousTransactedSession() throws Exception { - final ArrayList<Integer> redeliverMsgs = new ArrayList<Integer>(); - redeliverMsgs.add(1); - redeliverMsgs.add(2); - redeliverMsgs.add(5); - redeliverMsgs.add(14); - - doTest(Session.SESSION_TRANSACTED, redeliverMsgs, true, false); + doTest(Session.SESSION_TRANSACTED, _redeliverMsgs, true, false); } public void testDurableSubscription() throws Exception { - final ArrayList<Integer> redeliverMsgs = new ArrayList<Integer>(); - redeliverMsgs.add(1); - redeliverMsgs.add(2); - redeliverMsgs.add(5); - redeliverMsgs.add(14); + doTest(Session.SESSION_TRANSACTED, _redeliverMsgs, false, true); + } + + public void testWhenBrokerIsRestartedAfterEnqeuingMessages() throws Exception + { + restartBroker(); - doTest(Session.SESSION_TRANSACTED, redeliverMsgs, false, true); + doTest(Session.SESSION_TRANSACTED, _redeliverMsgs, true, false); } - public void doTest(final int deliveryMode, final ArrayList<Integer> redeliverMsgs, final boolean synchronous, final boolean durableSub) throws Exception + private void doTest(final int deliveryMode, final List<Integer> redeliverMsgs, final boolean synchronous, final boolean durableSub) throws Exception { final Connection clientConnection = getConnection(); @@ -311,7 +280,6 @@ public class MaxDeliveryCountTest extends QpidBrokerTestCase restartBroker(); final Connection clientConnection2 = getConnection(); - final Session clientSession2 = clientConnection2.createSession(transacted, deliveryMode); clientConnection2.start(); //verify the messages on the DLQ @@ -406,7 +374,7 @@ public class MaxDeliveryCountTest extends QpidBrokerTestCase } private void addMessageListener(final Session session, final MessageConsumer consumer, final int deliveryMode, final int maxDeliveryCount, - final int expectedTotalNumberOfDeliveries, final ArrayList<Integer> redeliverMsgs) throws JMSException + final int expectedTotalNumberOfDeliveries, final List<Integer> redeliverMsgs) throws JMSException { if(deliveryMode == org.apache.qpid.jms.Session.NO_ACKNOWLEDGE || deliveryMode == org.apache.qpid.jms.Session.PRE_ACKNOWLEDGE) @@ -567,7 +535,7 @@ public class MaxDeliveryCountTest extends QpidBrokerTestCase } private void doSynchronousTest(final Session session, final MessageConsumer consumer, final int deliveryMode, final int maxDeliveryCount, - final int expectedTotalNumberOfDeliveries, final ArrayList<Integer> redeliverMsgs) throws JMSException, AMQException, InterruptedException + final int expectedTotalNumberOfDeliveries, final List<Integer> redeliverMsgs) throws JMSException, AMQException, InterruptedException { if(deliveryMode == Session.AUTO_ACKNOWLEDGE || deliveryMode == Session.DUPS_OK_ACKNOWLEDGE @@ -637,7 +605,7 @@ public class MaxDeliveryCountTest extends QpidBrokerTestCase //sleep then do a synchronous op to give the broker //time to resend all the messages Thread.sleep(500); - ((AMQSession) session).sync(); + ((AMQSession<?,?>) session).sync(); break; } diff --git a/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/BrokerClosesClientConnectionTest.java b/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/BrokerClosesClientConnectionTest.java index 5b3bca7033..2cd7520ae4 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/BrokerClosesClientConnectionTest.java +++ b/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/BrokerClosesClientConnectionTest.java @@ -26,7 +26,6 @@ import java.util.concurrent.TimeUnit; import org.apache.qpid.AMQConnectionClosedException; import org.apache.qpid.AMQDisconnectedException; import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.management.jmx.ManagedConnectionMBeanTest; import org.apache.qpid.test.utils.QpidBrokerTestCase; import org.apache.qpid.transport.ConnectionException; diff --git a/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionFactoryTest.java b/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionFactoryTest.java index a313475b11..bf1fbbf1a3 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionFactoryTest.java +++ b/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionFactoryTest.java @@ -20,6 +20,8 @@ */ package org.apache.qpid.test.unit.client.connection; +import javax.jms.Connection; + import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQConnectionFactory; import org.apache.qpid.test.utils.QpidBrokerTestCase; @@ -57,5 +59,20 @@ public class ConnectionFactoryTest extends QpidBrokerTestCase assertEquals("Usernames used is different from the one in URL","guest",con3.getConnectionURL().getUsername()); assertEquals("Password used is different from the one in URL","guest",con3.getConnectionURL().getPassword()); } - + + /** + * Verifies that a connection can be made using an instance of AMQConnectionFactory created with the + * default constructor and provided with the connection url via setter. + */ + public void testCreatingConnectionWithInstanceMadeUsingDefaultConstructor() throws Exception + { + String broker = getBroker().toString(); + String url = "amqp://guest:guest@clientID/test?brokerlist='" + broker + "'"; + + AMQConnectionFactory factory = new AMQConnectionFactory(); + factory.setConnectionURLString(url); + + Connection con = factory.createConnection(); + con.close(); + } } diff --git a/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTest.java b/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTest.java index cc76d89a67..b11df5a2a0 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTest.java +++ b/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTest.java @@ -30,7 +30,7 @@ import javax.jms.Queue; * This tests the behaviour of transactional sessions when the {@code transactionTimeout} configuration * is set for a virtual host. * - * A producer that is idle for too long or open for too long will have its connection closed and + * A producer that is idle for too long or open for too long will have its connection/session(0-10) closed and * any further operations will fail with a 408 resource timeout exception. Consumers will not * be affected by the transaction timeout configuration. */ diff --git a/java/systests/src/main/java/org/apache/qpid/test/unit/xa/FaultTest.java b/java/systests/src/main/java/org/apache/qpid/test/unit/xa/FaultTest.java index e940a73bbb..39973e12c7 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/unit/xa/FaultTest.java +++ b/java/systests/src/main/java/org/apache/qpid/test/unit/xa/FaultTest.java @@ -1,6 +1,5 @@ -package org.apache.qpid.test.unit.xa; /* - * + * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -8,17 +7,18 @@ package org.apache.qpid.test.unit.xa; * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. - * + * */ +package org.apache.qpid.test.unit.xa; import junit.framework.TestSuite; @@ -344,7 +344,7 @@ public class FaultTest extends AbstractXATestCase { assertEquals("Wrong error code: ", XAException.XAER_PROTO, e.errorCode); } - } + } /** * Strategy: @@ -366,27 +366,28 @@ public class FaultTest extends AbstractXATestCase /** * Strategy: * Check that a transaction timeout as expected - * - set timeout to 10ms - * - sleep 1000ms + * - set timeout to 1s + * - sleep 1500ms * - call end and check that the expected exception is thrown */ public void testTransactionTimeout() throws Exception { + _xaResource.setTransactionTimeout(1); + Xid xid = getNewXid(); try { _xaResource.start(xid, XAResource.TMNOFLAGS); - assertEquals("Wrong timeout", _xaResource.getTransactionTimeout(), 0); - _xaResource.setTransactionTimeout(10); - Thread.sleep(1000); + Thread.sleep(1500); _xaResource.end(xid, XAResource.TMSUCCESS); + fail("Timeout expected "); } catch (XAException e) { assertEquals("Wrong error code: ", XAException.XA_RBTIMEOUT, e.errorCode); } } - + /** * Strategy: * Set the transaction timeout to 1000 @@ -394,18 +395,18 @@ public class FaultTest extends AbstractXATestCase public void testTransactionTimeoutAfterCommit() throws Exception { Xid xid = getNewXid(); - + _xaResource.start(xid, XAResource.TMNOFLAGS); _xaResource.setTransactionTimeout(1000); assertEquals("Wrong timeout", 1000,_xaResource.getTransactionTimeout()); - + //_xaResource.prepare(xid); _xaResource.end(xid, XAResource.TMSUCCESS); _xaResource.commit(xid, true); - + _xaResource.setTransactionTimeout(2000); assertEquals("Wrong timeout", 2000,_xaResource.getTransactionTimeout()); - + xid = getNewXid(); _xaResource.start(xid, XAResource.TMNOFLAGS); assertEquals("Wrong timeout", 2000, _xaResource.getTransactionTimeout()); diff --git a/java/systests/src/main/java/org/apache/qpid/test/utils/BrokerHolder.java b/java/systests/src/main/java/org/apache/qpid/test/utils/BrokerHolder.java index 66b3fe0c6a..3af57327d2 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/utils/BrokerHolder.java +++ b/java/systests/src/main/java/org/apache/qpid/test/utils/BrokerHolder.java @@ -25,4 +25,5 @@ public interface BrokerHolder String getWorkingDirectory(); void shutdown(); void kill(); + String dumpThreads(); } diff --git a/java/systests/src/main/java/org/apache/qpid/test/utils/InternalBrokerHolder.java b/java/systests/src/main/java/org/apache/qpid/test/utils/InternalBrokerHolder.java index adda9ca3ec..a71a4ef517 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/utils/InternalBrokerHolder.java +++ b/java/systests/src/main/java/org/apache/qpid/test/utils/InternalBrokerHolder.java @@ -20,6 +20,11 @@ */ package org.apache.qpid.test.utils; +import java.lang.management.ManagementFactory; +import java.lang.management.ThreadInfo; +import java.lang.management.ThreadMXBean; +import java.util.Set; + import org.apache.log4j.Logger; import org.apache.qpid.server.Broker; @@ -31,7 +36,9 @@ public class InternalBrokerHolder implements BrokerHolder private final Broker _broker; private final String _workingDirectory; - public InternalBrokerHolder(final Broker broker, String workingDirectory) + private Set<Integer> _portsUsedByBroker; + + public InternalBrokerHolder(final Broker broker, String workingDirectory, Set<Integer> portsUsedByBroker) { if(broker == null) { @@ -40,6 +47,7 @@ public class InternalBrokerHolder implements BrokerHolder _broker = broker; _workingDirectory = workingDirectory; + _portsUsedByBroker = portsUsedByBroker; } @Override @@ -53,7 +61,9 @@ public class InternalBrokerHolder implements BrokerHolder LOGGER.info("Shutting down Broker instance"); _broker.shutdown(); - + + waitUntilPortsAreFree(); + LOGGER.info("Broker instance shutdown"); } @@ -64,5 +74,42 @@ public class InternalBrokerHolder implements BrokerHolder shutdown(); } + private void waitUntilPortsAreFree() + { + new PortHelper().waitUntilPortsAreFree(_portsUsedByBroker); + } + + @Override + public String dumpThreads() + { + ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean(); + ThreadInfo[] threadInfos = threadMXBean.dumpAllThreads(true, true); + StringBuilder dump = new StringBuilder(); + dump.append(String.format("%n")); + for (ThreadInfo threadInfo : threadInfos) + { + dump.append(threadInfo); + } + + long[] deadLocks = threadMXBean.findDeadlockedThreads(); + if (deadLocks != null && deadLocks.length > 0) + { + ThreadInfo[] deadlockedThreads = threadMXBean.getThreadInfo(deadLocks); + dump.append(String.format("%n")); + dump.append("Deadlock is detected!"); + dump.append(String.format("%n")); + for (ThreadInfo threadInfo : deadlockedThreads) + { + dump.append(threadInfo); + } + } + return dump.toString(); + } + + @Override + public String toString() + { + return "InternalBrokerHolder [_portsUsedByBroker=" + _portsUsedByBroker + "]"; + } } diff --git a/java/systests/src/main/java/org/apache/qpid/test/utils/JMXTestUtils.java b/java/systests/src/main/java/org/apache/qpid/test/utils/JMXTestUtils.java index d9c259c389..43b80b45fb 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/utils/JMXTestUtils.java +++ b/java/systests/src/main/java/org/apache/qpid/test/utils/JMXTestUtils.java @@ -33,11 +33,15 @@ import org.apache.qpid.management.common.mbeans.ManagedQueue; import org.apache.qpid.management.common.mbeans.ServerInformation; import org.apache.qpid.management.common.mbeans.UserManagement; +import javax.management.InstanceNotFoundException; import javax.management.JMException; +import javax.management.ListenerNotFoundException; import javax.management.MBeanException; import javax.management.MBeanServerConnection; import javax.management.MBeanServerInvocationHandler; import javax.management.MalformedObjectNameException; +import javax.management.NotificationFilter; +import javax.management.NotificationListener; import javax.management.ObjectName; import javax.management.remote.JMXConnector; import java.io.IOException; @@ -50,8 +54,8 @@ import java.util.Set; */ public class JMXTestUtils { - private static final String DEFAULT_PASSWORD = "admin"; - private static final String DEFAULT_USERID = "admin"; + public static final String DEFAULT_PASSWORD = "admin"; + public static final String DEFAULT_USERID = "admin"; private MBeanServerConnection _mbsc; private JMXConnector _jmxc; @@ -79,8 +83,16 @@ public class JMXTestUtils public void open() throws Exception { + open(0); // Zero signifies default broker to QBTC. + } + + public void open(final int brokerPort) throws Exception + { + int actualBrokerPort = _test.getPort(brokerPort); + int managementPort = _test.getManagementPort(actualBrokerPort); + _jmxc = JMXConnnectionFactory.getJMXConnection(5000, "127.0.0.1", - _test.getManagementPort(_test.getPort()), _user, _password); + managementPort, _user, _password); _mbsc = _jmxc.getMBeanServerConnection(); } @@ -93,6 +105,18 @@ public class JMXTestUtils } } + public void addNotificationListener(ObjectName name, NotificationListener listener, NotificationFilter filter, Object handback) + throws InstanceNotFoundException, IOException + { + _mbsc.addNotificationListener(name, listener, filter, handback); + } + + public void removeNotificationListener(ObjectName name, NotificationListener listener) + throws InstanceNotFoundException, IOException, ListenerNotFoundException + { + _mbsc.removeNotificationListener(name, listener); + } + /** * Create a non-durable exchange with the requested name * @@ -136,7 +160,6 @@ public class JMXTestUtils throws IOException, JMException, MBeanException { ManagedBroker managedBroker = getManagedBroker(virtualHostName); - managedBroker.unregisterExchange(exchange); } @@ -152,87 +175,81 @@ public class JMXTestUtils throws IOException, JMException, MBeanException { ManagedBroker managedBroker = getManagedBroker(virtualHostName); - managedBroker.deleteQueue(queueName); } - + /** - * Sets the logging level. + * Sets the logging level. * * @throws JMException * @throws IOException if there is a problem with the JMX Connection * @throws MBeanException */ public void setRuntimeLoggerLevel(String logger, String level) - throws IOException, JMException, MBeanException + throws IOException, JMException, MBeanException { LoggingManagement loggingManagement = getLoggingManagement(); - loggingManagement.setRuntimeLoggerLevel(logger, level); } - + /** - * Reload logging config file. + * Reload logging config file. * * @throws JMException * @throws IOException if there is a problem with the JMX Connection * @throws MBeanException */ public void reloadConfigFile() - throws IOException, JMException, MBeanException + throws IOException, JMException, MBeanException { LoggingManagement loggingManagement = getLoggingManagement(); - loggingManagement.reloadConfigFile(); } /** - * Get list of available logger levels. + * Get list of available logger levels. * * @throws JMException * @throws IOException if there is a problem with the JMX Connection * @throws MBeanException */ public String[] getAvailableLoggerLevels() - throws IOException, JMException, MBeanException + throws IOException, JMException, MBeanException { LoggingManagement loggingManagement = getLoggingManagement(); - return loggingManagement.getAvailableLoggerLevels(); } - + /** - * Set root logger level. + * Set root logger level. * * @throws JMException * @throws IOException if there is a problem with the JMX Connection * @throws MBeanException */ public void setRuntimeRootLoggerLevel(String level) - throws IOException, JMException, MBeanException + throws IOException, JMException, MBeanException { LoggingManagement loggingManagement = getLoggingManagement(); - loggingManagement.setRuntimeRootLoggerLevel(level); } - + /** - * Get root logger level. + * Get root logger level. * * @throws JMException * @throws IOException if there is a problem with the JMX Connection * @throws MBeanException */ public String getRuntimeRootLoggerLevel() - throws IOException, JMException, MBeanException + throws IOException, JMException, MBeanException { LoggingManagement loggingManagement = getLoggingManagement(); - return loggingManagement.getRuntimeRootLoggerLevel(); } /** - * Retrive the ObjectName for a Virtualhost. + * Retrieve the ObjectName for a Virtualhost. * * This is then used to create a proxy to the ManagedBroker MBean. * @@ -253,12 +270,12 @@ public class JMXTestUtils // We have verified we have only one value in objectNames so return it ObjectName objectName = objectNames.iterator().next(); - _test.getLogger().info("Loading: " + objectName); + _test.getLogger().info("Loading: " + objectName); return objectName; } /** - * Retrive the ObjectName for the given Queue on a Virtualhost. + * Retrieve the ObjectName for the given Queue on a Virtualhost. * * This is then used to create a proxy to the ManagedQueue MBean. * @@ -281,7 +298,7 @@ public class JMXTestUtils // We have verified we have only one value in objectNames so return it ObjectName objectName = objectNames.iterator().next(); - _test.getLogger().info("Loading: " + objectName); + _test.getLogger().info("Loading: " + objectName); return objectName; } @@ -309,7 +326,7 @@ public class JMXTestUtils // We have verified we have only one value in objectNames so return it ObjectName objectName = objectNames.iterator().next(); - _test.getLogger().info("Loading: " + objectName); + _test.getLogger().info("Loading: " + objectName); return objectName; } @@ -319,10 +336,10 @@ public class JMXTestUtils Set<ObjectName> objectNames = queryObjects(query); _test.assertNotNull("Null ObjectName Set returned", objectNames); - _test.assertEquals("More than one " + managedClass + " returned", 1, objectNames.size()); + _test.assertEquals("Unexpected number of objects matching " + managedClass + " returned", 1, objectNames.size()); ObjectName objectName = objectNames.iterator().next(); - _test.getLogger().info("Loading: " + objectName); + _test.getLogger().info("Loading: " + objectName); return getManagedObject(managedClass, objectName); } @@ -355,34 +372,34 @@ public class JMXTestUtils { return getManagedObject(ManagedBroker.class, getVirtualHostManagerObjectName(virtualHost)); } - + public ManagedExchange getManagedExchange(String exchangeName) { - ObjectName objectName = getExchangeObjectName("test", exchangeName); + ObjectName objectName = getExchangeObjectName("test", exchangeName); return MBeanServerInvocationHandler.newProxyInstance(_mbsc, objectName, ManagedExchange.class, false); } - + public ManagedQueue getManagedQueue(String queueName) { ObjectName objectName = getQueueObjectName("test", queueName); return getManagedObject(ManagedQueue.class, objectName); } - public LoggingManagement getLoggingManagement() throws MalformedObjectNameException + public LoggingManagement getLoggingManagement() throws MalformedObjectNameException { - ObjectName objectName = new ObjectName("org.apache.qpid:type=LoggingManagement,name=LoggingManagement"); + ObjectName objectName = new ObjectName("org.apache.qpid:type=LoggingManagement,name=LoggingManagement"); return getManagedObject(LoggingManagement.class, objectName); } - - public ConfigurationManagement getConfigurationManagement() throws MalformedObjectNameException + + public ConfigurationManagement getConfigurationManagement() throws MalformedObjectNameException { - ObjectName objectName = new ObjectName("org.apache.qpid:type=ConfigurationManagement,name=ConfigurationManagement"); + ObjectName objectName = new ObjectName("org.apache.qpid:type=ConfigurationManagement,name=ConfigurationManagement"); return getManagedObject(ConfigurationManagement.class, objectName); } - - public UserManagement getUserManagement() throws MalformedObjectNameException + + public UserManagement getUserManagement() throws MalformedObjectNameException { - ObjectName objectName = new ObjectName("org.apache.qpid:type=UserManagement,name=UserManagement"); + ObjectName objectName = new ObjectName("org.apache.qpid:type=UserManagement,name=UserManagement"); return getManagedObject(UserManagement.class, objectName); } diff --git a/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java b/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java index c070fb4de0..aa909a6674 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java +++ b/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java @@ -17,11 +17,41 @@ */ package org.apache.qpid.test.utils; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.PrintStream; +import java.net.MalformedURLException; +import java.net.URL; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MapMessage; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.ObjectMessage; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.StreamMessage; +import javax.jms.TextMessage; +import javax.jms.Topic; +import javax.naming.InitialContext; +import javax.naming.NamingException; + import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.configuration.XMLConfiguration; import org.apache.commons.lang.StringUtils; import org.apache.log4j.Logger; - import org.apache.qpid.AMQException; import org.apache.qpid.client.AMQConnectionFactory; import org.apache.qpid.client.AMQQueue; @@ -33,59 +63,33 @@ import org.apache.qpid.management.common.mbeans.ConfigurationManagement; import org.apache.qpid.server.Broker; import org.apache.qpid.server.BrokerOptions; import org.apache.qpid.server.ProtocolExclusion; +import org.apache.qpid.server.ProtocolInclusion; import org.apache.qpid.server.configuration.ServerConfiguration; import org.apache.qpid.server.protocol.AmqpProtocolVersion; import org.apache.qpid.server.store.MessageStoreConstants; -import org.apache.qpid.server.store.derby.DerbyMessageStoreFactory; +import org.apache.qpid.server.store.derby.DerbyMessageStore; import org.apache.qpid.url.URLSyntaxException; import org.apache.qpid.util.FileUtils; import org.apache.qpid.util.LogMonitor; -import javax.jms.BytesMessage; -import javax.jms.Connection; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.MapMessage; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.ObjectMessage; -import javax.jms.Queue; -import javax.jms.Session; -import javax.jms.StreamMessage; -import javax.jms.TextMessage; -import javax.jms.Topic; -import javax.naming.InitialContext; -import javax.naming.NamingException; -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.PrintStream; -import java.net.MalformedURLException; -import java.net.URL; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.TimeUnit; - /** * Qpid base class for system testing test cases. */ public class QpidBrokerTestCase extends QpidTestCase { - public enum BrokerType { EXTERNAL /** Test case relies on a Broker started independently of the test-suite */, INTERNAL /** Test case starts an embedded broker within this JVM */, SPAWNED /** Test case spawns a new broker as a separate process */ } + + public static final String GUEST_USERNAME = "guest"; + public static final String GUEST_PASSWORD = "guest"; + protected final static String QpidHome = System.getProperty("QPID_HOME"); protected File _configFile = new File(System.getProperty("broker.config")); + protected File _logConfigFile = new File(System.getProperty("log4j.configuration")); protected static final Logger _logger = Logger.getLogger(QpidBrokerTestCase.class); protected static final int LOGMONITOR_TIMEOUT = 5000; @@ -113,8 +117,10 @@ public class QpidBrokerTestCase extends QpidTestCase } // system properties + private static final String TEST_VIRTUALHOSTS = "test.virtualhosts"; + private static final String TEST_CONFIG = "test.config"; private static final String BROKER_LANGUAGE = "broker.language"; - private static final String BROKER_TYPE = "broker.type"; + protected static final String BROKER_TYPE = "broker.type"; private static final String BROKER_COMMAND = "broker.command"; private static final String BROKER_CLEAN_BETWEEN_TESTS = "broker.clean.between.tests"; private static final String BROKER_EXISTING_QPID_WORK = "broker.existing.qpid.work"; @@ -125,7 +131,8 @@ public class QpidBrokerTestCase extends QpidTestCase private static final String BROKER_LOG_INTERLEAVE = "broker.log.interleave"; private static final String BROKER_LOG_PREFIX = "broker.log.prefix"; private static final String BROKER_PERSITENT = "broker.persistent"; - private static final String BROKER_PROTOCOL_EXCLUDES = "broker.protocol.excludes"; + public static final String BROKER_PROTOCOL_EXCLUDES = "broker.protocol.excludes"; + public static final String BROKER_PROTOCOL_INCLUDES = "broker.protocol.includes"; // values protected static final String JAVA = "java"; @@ -146,7 +153,6 @@ public class QpidBrokerTestCase extends QpidTestCase private final AmqpProtocolVersion _brokerVersion = AmqpProtocolVersion.valueOf(System.getProperty(BROKER_VERSION, "")); protected String _output = System.getProperty(TEST_OUTPUT, System.getProperty("java.io.tmpdir")); protected Boolean _brokerPersistent = Boolean.getBoolean(BROKER_PERSITENT); - private String _brokerProtocolExcludes = System.getProperty(BROKER_PROTOCOL_EXCLUDES); protected static String _brokerLogPrefix = System.getProperty(BROKER_LOG_PREFIX,"BROKER: "); protected static boolean _interleaveBrokerLog = Boolean.getBoolean(BROKER_LOG_INTERLEAVE); @@ -164,13 +170,13 @@ public class QpidBrokerTestCase extends QpidTestCase protected List<Connection> _connections = new ArrayList<Connection>(); public static final String QUEUE = "queue"; public static final String TOPIC = "topic"; - + /** Map to hold test defined environment properties */ private Map<String, String> _env; /** Ensure our messages have some sort of size */ protected static final int DEFAULT_MESSAGE_SIZE = 1024; - + /** Size to create our message*/ private int _messageSize = DEFAULT_MESSAGE_SIZE; /** Type of message*/ @@ -193,7 +199,7 @@ public class QpidBrokerTestCase extends QpidTestCase { super(); } - + public Logger getLogger() { return QpidBrokerTestCase._logger; @@ -219,10 +225,6 @@ public class QpidBrokerTestCase extends QpidTestCase out = new PrintStream(new FileOutputStream(_outputFile), true); err = new PrintStream(String.format("%s/TEST-%s.err", _output, qname)); - // This is relying on behaviour specific to log4j 1.2.12. If we were to upgrade to 1.2.13 or - // beyond we must change either code (or config) to ensure that ConsoleAppender#setFollow - // is set to true otherwise log4j logging will not respect the following reassignment. - System.setOut(out); System.setErr(err); @@ -313,6 +315,24 @@ public class QpidBrokerTestCase extends QpidTestCase } /** + * The returned set of port numbers is only a guess because it assumes no ports have been overridden + * using system properties. + */ + protected Set<Integer> guessAllPortsUsedByBroker(int mainPort) + { + Set<Integer> ports = new HashSet<Integer>(); + int managementPort = getManagementPort(mainPort); + int connectorServerPort = managementPort + ServerConfiguration.JMXPORT_CONNECTORSERVER_OFFSET; + + ports.add(mainPort); + ports.add(managementPort); + ports.add(connectorServerPort); + ports.add(DEFAULT_SSL_PORT); + + return ports; + } + + /** * Get the Port that is use by the current broker * * @return the current port @@ -338,12 +358,16 @@ public class QpidBrokerTestCase extends QpidTestCase { final int sslPort = port-1; final String protocolExcludesList = getProtocolExcludesList(port, sslPort); + final String protocolIncludesList = getProtocolIncludesList(port, sslPort); + return _brokerCommand .replace("@PORT", "" + port) .replace("@SSL_PORT", "" + sslPort) .replace("@MPORT", "" + getManagementPort(port)) .replace("@CONFIG_FILE", _configFile.toString()) - .replace("@EXCLUDES", protocolExcludesList); + .replace("@LOG_CONFIG_FILE", _logConfigFile.toString()) + .replace("@EXCLUDES", protocolExcludesList) + .replace("@INCLUDES", protocolIncludesList); } public void startBroker() throws Exception @@ -353,39 +377,51 @@ public class QpidBrokerTestCase extends QpidTestCase public void startBroker(int port) throws Exception { + startBroker(port, _testConfiguration, _testVirtualhosts); + } + + public void startBroker(int port, XMLConfiguration testConfiguration, XMLConfiguration virtualHosts) throws Exception + { port = getPort(port); // Save any configuration changes that have been made - saveTestConfiguration(); - saveTestVirtualhosts(); + String testConfig = saveTestConfiguration(port, testConfiguration); + String virtualHostsConfig = saveTestVirtualhosts(port, virtualHosts); if(_brokers.get(port) != null) { throw new IllegalStateException("There is already an existing broker running on port " + port); } + Set<Integer> portsUsedByBroker = guessAllPortsUsedByBroker(port); + if (_brokerType.equals(BrokerType.INTERNAL) && !existingInternalBroker()) { setConfigurationProperty(ServerConfiguration.MGMT_CUSTOM_REGISTRY_SOCKET, String.valueOf(false)); - saveTestConfiguration(); + testConfig = saveTestConfiguration(port, testConfiguration); + _logger.info("Set test.config property to: " + testConfig); + _logger.info("Set test.virtualhosts property to: " + virtualHostsConfig); + setSystemProperty(TEST_CONFIG, testConfig); + setSystemProperty(TEST_VIRTUALHOSTS, virtualHostsConfig); BrokerOptions options = new BrokerOptions(); options.setConfigFile(_configFile.getAbsolutePath()); options.addPort(port); addExcludedPorts(port, DEFAULT_SSL_PORT, options); + addIncludedPorts(port, DEFAULT_SSL_PORT, options); options.setJmxPortRegistryServer(getManagementPort(port)); //Set the log config file, relying on the log4j.configuration system property //set on the JVM by the JUnit runner task in module.xml. - options.setLogConfigFile(new URL(System.getProperty("log4j.configuration")).getFile()); + options.setLogConfigFile(_logConfigFile.getAbsolutePath()); Broker broker = new Broker(); _logger.info("starting internal broker (same JVM)"); broker.startup(options); - _brokers.put(port, new InternalBrokerHolder(broker, System.getProperty("QPID_WORK"))); + _brokers.put(port, new InternalBrokerHolder(broker, System.getProperty("QPID_WORK"), portsUsedByBroker)); } else if (!_brokerType.equals(BrokerType.EXTERNAL)) { @@ -395,16 +431,16 @@ public class QpidBrokerTestCase extends QpidTestCase _logger.info("starting external broker: " + cmd); ProcessBuilder pb = new ProcessBuilder(cmd.split("\\s+")); pb.redirectErrorStream(true); - Map<String, String> env = pb.environment(); + Map<String, String> processEnv = pb.environment(); String qpidHome = System.getProperty(QPID_HOME); - env.put(QPID_HOME, qpidHome); + processEnv.put(QPID_HOME, qpidHome); //Augment Path with bin directory in QPID_HOME. - env.put("PATH", env.get("PATH").concat(File.pathSeparator + qpidHome + "/bin")); + processEnv.put("PATH", processEnv.get("PATH").concat(File.pathSeparator + qpidHome + "/bin")); //Add the test name to the broker run. // DON'T change PNAME, qpid.stop needs this value. - env.put("QPID_PNAME", "-DPNAME=QPBRKR -DTNAME=\"" + getTestName() + "\""); - env.put("QPID_WORK", qpidWork); + processEnv.put("QPID_PNAME", "-DPNAME=QPBRKR -DTNAME=\"" + getTestName() + "\""); + processEnv.put("QPID_WORK", qpidWork); // Use the environment variable to set amqj.logging.level for the broker // The value used is a 'server' value in the test configuration to @@ -419,7 +455,7 @@ public class QpidBrokerTestCase extends QpidTestCase { for (Map.Entry<String, String> entry : _env.entrySet()) { - env.put(entry.getKey(), entry.getValue()); + processEnv.put(entry.getKey(), entry.getValue()); } } @@ -436,25 +472,25 @@ public class QpidBrokerTestCase extends QpidTestCase setSystemProperty("root.logging.level"); } + // set test.config and test.virtualhosts + String qpidOpts = " -D" + TEST_CONFIG + "=" + testConfig + " -D" + TEST_VIRTUALHOSTS + "=" + virtualHostsConfig; - String QPID_OPTS = " "; // Add all the specified system properties to QPID_OPTS if (!_propertiesSetForBroker.isEmpty()) { for (String key : _propertiesSetForBroker.keySet()) { - QPID_OPTS += "-D" + key + "=" + _propertiesSetForBroker.get(key) + " "; - } - - if (env.containsKey("QPID_OPTS")) - { - env.put("QPID_OPTS", env.get("QPID_OPTS") + QPID_OPTS); - } - else - { - env.put("QPID_OPTS", QPID_OPTS); + qpidOpts += " -D" + key + "=" + _propertiesSetForBroker.get(key); } } + if (processEnv.containsKey("QPID_OPTS")) + { + qpidOpts = processEnv.get("QPID_OPTS") + qpidOpts; + } + processEnv.put("QPID_OPTS", qpidOpts); + + _logger.info("Set test.config property to: " + testConfig); + _logger.info("Set test.virtualhosts property to: " + virtualHostsConfig); // cpp broker requires that the work directory is created createBrokerWork(qpidWork); @@ -492,14 +528,14 @@ public class QpidBrokerTestCase extends QpidTestCase // this is expect if the broker started successfully } - _brokers.put(port, new SpawnedBrokerHolder(process, qpidWork)); + _brokers.put(port, new SpawnedBrokerHolder(process, qpidWork, portsUsedByBroker)); } } private void addExcludedPorts(int port, int sslPort, BrokerOptions options) { final String protocolExcludesList = getProtocolExcludesList(port, sslPort); - + if (protocolExcludesList.equals("")) { return; @@ -522,9 +558,36 @@ public class QpidBrokerTestCase extends QpidTestCase protected String getProtocolExcludesList(int port, int sslPort) { - final String protocolExcludesList = - _brokerProtocolExcludes.replace("@PORT", "" + port).replace("@SSL_PORT", "" + sslPort); - return protocolExcludesList; + return System.getProperty(BROKER_PROTOCOL_EXCLUDES,"").replace("@PORT", "" + port).replace("@SSL_PORT", "" + sslPort); + } + + private String getProtocolIncludesList(int port, int sslPort) + { + return System.getProperty(BROKER_PROTOCOL_INCLUDES, "").replace("@PORT", "" + port).replace("@SSL_PORT", "" + sslPort); + } + + private void addIncludedPorts(int port, int sslPort, BrokerOptions options) + { + final String protocolIncludesList = getProtocolIncludesList(port, sslPort); + + if (protocolIncludesList.equals("")) + { + return; + } + final String[] toks = protocolIncludesList.split("\\s"); + + if(toks.length % 2 != 0) + { + throw new IllegalArgumentException("Must be an even number of tokens in '" + protocolIncludesList + "'"); + } + for (int i = 0; i < toks.length; i=i+2) + { + String includeArg = toks[i]; + final int includedPort = Integer.parseInt(toks[i+1]); + options.addIncludedPort(ProtocolInclusion.lookup(includeArg), includedPort); + + _logger.info("Adding protocol inclusion " + includeArg + " " + includedPort); + } } private boolean existingInternalBroker() @@ -552,12 +615,17 @@ public class QpidBrokerTestCase extends QpidTestCase public String getTestConfigFile() { - return _output + "/" + getTestQueueName() + "-config.xml"; + return getTestConfigFile(getPort()); + } + + public String getTestConfigFile(int port) + { + return _output + "/" + getTestQueueName() + "-" + port + "-config.xml"; } - public String getTestVirtualhostsFile() + public String getTestVirtualhostsFile(int port) { - return _output + "/" + getTestQueueName() + "-virtualhosts.xml"; + return _output + "/" + getTestQueueName() + "-" + port + "-virtualhosts.xml"; } private String relativeToQpidHome(String file) @@ -567,38 +635,50 @@ public class QpidBrokerTestCase extends QpidTestCase protected void saveTestConfiguration() throws ConfigurationException { + String relative = saveTestConfiguration(getPort(), _testConfiguration); + _logger.info("Set test.config property to: " + relative); + setSystemProperty(TEST_CONFIG, relative); + } + + protected String saveTestConfiguration(int port, XMLConfiguration testConfiguration) throws ConfigurationException + { // Specify the test config file - String testConfig = getTestConfigFile(); + String testConfig = getTestConfigFile(port); String relative = relativeToQpidHome(testConfig); - setSystemProperty("test.config", relative); - _logger.info("Set test.config property to: " + relative); _logger.info("Saving test virtualhosts file at: " + testConfig); // Create the file if configuration does not exist - if (_testConfiguration.isEmpty()) + if (testConfiguration.isEmpty()) { - _testConfiguration.addProperty("__ignore", "true"); + testConfiguration.addProperty("__ignore", "true"); } - _testConfiguration.save(testConfig); + testConfiguration.save(testConfig); + return relative; } protected void saveTestVirtualhosts() throws ConfigurationException { + String relative = saveTestVirtualhosts(getPort(), _testVirtualhosts); + _logger.info("Set test.virtualhosts property to: " + relative); + setSystemProperty(TEST_VIRTUALHOSTS, relative); + } + + protected String saveTestVirtualhosts(int port, XMLConfiguration virtualHostConfiguration) throws ConfigurationException + { // Specify the test virtualhosts file - String testVirtualhosts = getTestVirtualhostsFile(); + String testVirtualhosts = getTestVirtualhostsFile(port); String relative = relativeToQpidHome(testVirtualhosts); - setSystemProperty("test.virtualhosts", relative); - _logger.info("Set test.virtualhosts property to: " + relative); - _logger.info("Saving test virtualhosts file at: " + testVirtualhosts); + _logger.info("Set test.virtualhosts property to: " + testVirtualhosts); // Create the file if configuration does not exist - if (_testVirtualhosts.isEmpty()) + if (virtualHostConfiguration.isEmpty()) { - _testVirtualhosts.addProperty("__ignore", "true"); + virtualHostConfiguration.addProperty("__ignore", "true"); } - _testVirtualhosts.save(testVirtualhosts); + virtualHostConfiguration.save(testVirtualhosts); + return relative; } protected void cleanBrokerWork(final String qpidWork) @@ -639,11 +719,55 @@ public class QpidBrokerTestCase extends QpidTestCase public void stopAllBrokers() { + boolean exceptionOccured = false; Set<Integer> runningBrokerPorts = new HashSet<Integer>(getBrokerPortNumbers()); for (int brokerPortNumber : runningBrokerPorts) { + if (!stopBrokerSafely(brokerPortNumber)) + { + exceptionOccured = true; + } + } + if (exceptionOccured) + { + throw new RuntimeException("Exception occured on stopping of test broker. Please, examine logs for details"); + } + } + + protected boolean stopBrokerSafely(int brokerPortNumber) + { + boolean success = true; + BrokerHolder broker = _brokers.get(brokerPortNumber); + try + { stopBroker(brokerPortNumber); } + catch(Exception e) + { + success = false; + _logger.error("Failed to stop broker " + broker + " at port " + brokerPortNumber, e); + if (broker != null) + { + // save the thread dump in case of dead locks + try + { + _logger.error("Broker " + broker + " thread dump:" + broker.dumpThreads()); + } + finally + { + // try to kill broker + try + { + broker.kill(); + } + catch(Exception killException) + { + // ignore + } + } + } + } + return success; } public void stopBroker(int port) @@ -705,21 +829,21 @@ public class QpidBrokerTestCase extends QpidTestCase protected void makeVirtualHostPersistent(String virtualhost) throws ConfigurationException, IOException { - Class<?> storeFactoryClass = null; + Class<?> storeClass = null; try { // Try and lookup the BDB class - storeFactoryClass = Class.forName("org.apache.qpid.server.store.berkeleydb.BDBMessageStoreFactory"); + storeClass = Class.forName("org.apache.qpid.server.store.berkeleydb.BDBMessageStore"); } catch (ClassNotFoundException e) { // No BDB store, we'll use Derby instead. - storeFactoryClass = DerbyMessageStoreFactory.class; + storeClass = DerbyMessageStore.class; } - setConfigurationProperty("virtualhosts.virtualhost." + virtualhost + ".store.factoryclass", - storeFactoryClass.getName()); + setConfigurationProperty("virtualhosts.virtualhost." + virtualhost + ".store.class", + storeClass.getName()); setConfigurationProperty("virtualhosts.virtualhost." + virtualhost + ".store." + MessageStoreConstants.ENVIRONMENT_PATH_PROPERTY, "${QPID_WORK}/" + virtualhost); } @@ -974,10 +1098,10 @@ public class QpidBrokerTestCase extends QpidTestCase { return (AMQConnectionFactory) getInitialContext().lookup(factoryName); } - + public Connection getConnection() throws JMSException, NamingException { - return getConnection("guest", "guest"); + return getConnection(GUEST_USERNAME, GUEST_PASSWORD); } public Connection getConnection(ConnectionURL url) throws JMSException @@ -1268,14 +1392,14 @@ public class QpidBrokerTestCase extends QpidTestCase /** * Reloads the broker security configuration using the ApplicationRegistry (InVM brokers) or the - * ConfigurationManagementMBean via the JMX interface (Standalone brokers, management must be + * ConfigurationManagementMBean via the JMX interface (Standalone brokers, management must be * enabled before calling the method). */ public void reloadBrokerSecurityConfig() throws Exception { JMXTestUtils jmxu = new JMXTestUtils(this); jmxu.open(); - + try { ConfigurationManagement configMBean = jmxu.getConfigurationManagement(); @@ -1285,7 +1409,7 @@ public class QpidBrokerTestCase extends QpidTestCase { jmxu.close(); } - + LogMonitor _monitor = new LogMonitor(_outputFile); assertTrue("The expected server security configuration reload did not occur", _monitor.waitForMessage(ServerConfiguration.SECURITY_CONFIG_RELOADED, LOGMONITOR_TIMEOUT)); @@ -1295,4 +1419,24 @@ public class QpidBrokerTestCase extends QpidTestCase { return FAILING_PORT; } + + public XMLConfiguration getTestVirtualhosts() + { + return _testVirtualhosts; + } + + public void setTestVirtualhosts(XMLConfiguration testVirtualhosts) + { + _testVirtualhosts = testVirtualhosts; + } + + public XMLConfiguration getTestConfiguration() + { + return _testConfiguration; + } + + public void setTestConfiguration(XMLConfiguration testConfiguration) + { + _testConfiguration = testConfiguration; + } } diff --git a/java/systests/src/main/java/org/apache/qpid/test/utils/SpawnedBrokerHolder.java b/java/systests/src/main/java/org/apache/qpid/test/utils/SpawnedBrokerHolder.java index 787fc164d5..bce97a574a 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/utils/SpawnedBrokerHolder.java +++ b/java/systests/src/main/java/org/apache/qpid/test/utils/SpawnedBrokerHolder.java @@ -20,7 +20,10 @@ */ package org.apache.qpid.test.utils; +import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.io.InputStream; +import java.util.Set; import org.apache.log4j.Logger; @@ -32,8 +35,9 @@ public class SpawnedBrokerHolder implements BrokerHolder private final Process _process; private final Integer _pid; private final String _workingDirectory; + private Set<Integer> _portsUsedByBroker; - public SpawnedBrokerHolder(final Process process, final String workingDirectory) + public SpawnedBrokerHolder(final Process process, final String workingDirectory, Set<Integer> portsUsedByBroker) { if(process == null) { @@ -43,6 +47,7 @@ public class SpawnedBrokerHolder implements BrokerHolder _process = process; _pid = retrieveUnixPidIfPossible(); _workingDirectory = workingDirectory; + _portsUsedByBroker = portsUsedByBroker; } @Override @@ -57,6 +62,8 @@ public class SpawnedBrokerHolder implements BrokerHolder _process.destroy(); reapChildProcess(); + + waitUntilPortsAreFree(); } @Override @@ -74,6 +81,8 @@ public class SpawnedBrokerHolder implements BrokerHolder } reapChildProcess(); + + waitUntilPortsAreFree(); } private void sendSigkillForImmediateShutdown(Integer pid) @@ -146,4 +155,37 @@ public class SpawnedBrokerHolder implements BrokerHolder } } + private void waitUntilPortsAreFree() + { + new PortHelper().waitUntilPortsAreFree(_portsUsedByBroker); + } + + @Override + public String dumpThreads() + { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try + { + Process process = Runtime.getRuntime().exec("jstack " + _pid); + InputStream is = process.getInputStream(); + byte[] buffer = new byte[1024]; + int length = -1; + while ((length = is.read(buffer)) != -1) + { + baos.write(buffer, 0, length); + } + } + catch (Exception e) + { + LOGGER.error("Error whilst collecting thread dump for " + _pid, e); + } + return new String(baos.toByteArray()); + } + + @Override + public String toString() + { + return "SpawnedBrokerHolder [_pid=" + _pid + ", _portsUsedByBroker=" + + _portsUsedByBroker + "]"; + } } |