diff options
Diffstat (limited to 'qpid/java/systests/src/test/java/org/apache/qpid/systest/management/jmx')
11 files changed, 3040 insertions, 0 deletions
diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/systest/management/jmx/BrokerManagementTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/systest/management/jmx/BrokerManagementTest.java new file mode 100644 index 0000000000..f6b56f64ce --- /dev/null +++ b/qpid/java/systests/src/test/java/org/apache/qpid/systest/management/jmx/BrokerManagementTest.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.systest.management.jmx; + +import org.apache.qpid.exchange.ExchangeDefaults; +import org.apache.qpid.management.common.mbeans.ManagedBroker; +import org.apache.qpid.management.common.mbeans.ManagedExchange; +import org.apache.qpid.test.utils.JMXTestUtils; +import org.apache.qpid.test.utils.QpidBrokerTestCase; + +/** + * Tests the JMX API for the Managed Broker. + * + */ +public class BrokerManagementTest extends QpidBrokerTestCase +{ + private static final String VIRTUAL_HOST = "test"; + + /** + * JMX helper. + */ + private JMXTestUtils _jmxUtils; + private ManagedBroker _managedBroker; + + public void setUp() throws Exception + { + getBrokerConfiguration().addJmxManagementConfiguration(); + + _jmxUtils = new JMXTestUtils(this); + + super.setUp(); + _jmxUtils.open(); + _managedBroker = _jmxUtils.getManagedBroker(VIRTUAL_HOST); + } + + public void tearDown() throws Exception + { + if (_jmxUtils != null) + { + _jmxUtils.close(); + } + super.tearDown(); + } + + /** + * Tests queue creation/deletion also verifying the automatic binding to the default exchange. + */ + public void testCreateQueueAndDeletion() throws Exception + { + final String queueName = getTestQueueName(); + + + _managedBroker.createNewQueue(queueName, "testowner", true); + + // Ensure the queue exists + assertNotNull("Queue object name expected to exist", _jmxUtils.getQueueObjectName(VIRTUAL_HOST, queueName)); + assertNotNull("Manager queue expected to be available", _jmxUtils.getManagedQueue(queueName)); + + + // Now delete the queue + _managedBroker.deleteQueue(queueName); + + + } + + /** + * Tests exchange creation/deletion via JMX API. + */ + public void testCreateExchangeAndUnregister() throws Exception + { + String exchangeName = getTestName(); + _managedBroker.createNewExchange(exchangeName, "topic", true); + + ManagedExchange exchange = _jmxUtils.getManagedExchange(exchangeName); + assertNotNull("Exchange should exist", exchange); + + _managedBroker.unregisterExchange(exchangeName); + } + + /** + * Tests that it is disallowed to unregister the default exchange. + */ + public void testUnregisterOfAmqDirectExchangeDisallowed() throws Exception + { + String amqDirectExchangeName = "amq.direct"; + + ManagedExchange amqDirectExchange = _jmxUtils.getManagedExchange(amqDirectExchangeName); + assertNotNull("Exchange should exist", amqDirectExchange); + try + { + _managedBroker.unregisterExchange(amqDirectExchangeName); + fail("Exception not thrown"); + } + catch (UnsupportedOperationException e) + { + // PASS + assertEquals("'"+amqDirectExchangeName+"' is a reserved exchange and can't be deleted", e.getMessage()); + } + amqDirectExchange = _jmxUtils.getManagedExchange(amqDirectExchangeName); + assertNotNull("Exchange should exist", amqDirectExchange); + } + +} diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/systest/management/jmx/ConnectionManagementTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/systest/management/jmx/ConnectionManagementTest.java new file mode 100644 index 0000000000..34b13dfaca --- /dev/null +++ b/qpid/java/systests/src/test/java/org/apache/qpid/systest/management/jmx/ConnectionManagementTest.java @@ -0,0 +1,285 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.systest.management.jmx; + +import java.io.IOException; +import java.util.Date; +import java.util.Iterator; +import java.util.List; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.management.JMException; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.CompositeDataSupport; +import javax.management.openmbean.TabularData; + +import org.apache.commons.lang.StringUtils; +import org.apache.qpid.common.QpidProperties; +import org.apache.qpid.management.common.mbeans.ManagedConnection; +import org.apache.qpid.management.common.mbeans.ManagedQueue; +import org.apache.qpid.test.utils.JMXTestUtils; +import org.apache.qpid.test.utils.QpidBrokerTestCase; + +public class ConnectionManagementTest extends QpidBrokerTestCase +{ + private static final String VIRTUAL_HOST_NAME = "test"; + + private JMXTestUtils _jmxUtils; + private Connection _connection; + + public void setUp() throws Exception + { + getBrokerConfiguration().addJmxManagementConfiguration(); + + _jmxUtils = new JMXTestUtils(this); + + super.setUp(); + _jmxUtils.open(); + } + + public void tearDown() throws Exception + { + try + { + if (_jmxUtils != null) + { + _jmxUtils.close(); + } + } + finally + { + super.tearDown(); + } + } + + public void testNumberOfManagedConnectionsMatchesNumberOfClientConnections() throws Exception + { + assertEquals("Expected no managed connections", 0, getManagedConnections().size()); + + _connection = getConnection(); + assertEquals("Expected one managed connection", 1, getManagedConnections().size()); + + _connection.close(); + assertEquals("Expected no managed connections after client connection closed", 0, getManagedConnections().size()); + } + + public void testGetAttributes() throws Exception + { + _connection = getConnection(); + final ManagedConnection mBean = getConnectionMBean(); + + checkAuthorisedId(mBean); + checkClientVersion(mBean); + checkClientId(mBean); + } + + public void testNonTransactedSession() throws Exception + { + _connection = getConnection(); + + boolean transactional = false; + boolean flowBlocked = false; + + _connection.createSession(transactional, Session.AUTO_ACKNOWLEDGE); + + final ManagedConnection mBean = getConnectionMBean(); + final CompositeDataSupport row = getTheOneChannelRow(mBean); + assertChannelRowData(row, 0, transactional, flowBlocked); + } + + public void testTransactedSessionWithUnackMessages() throws Exception + { + _connection = getConnection(); + _connection.start(); + + boolean transactional = true; + int numberOfMessages = 2; + final Session session = _connection.createSession(transactional, Session.SESSION_TRANSACTED); + final Destination destination = session.createQueue(getTestQueueName()); + final MessageConsumer consumer = session.createConsumer(destination); + + sendMessage(session, destination, numberOfMessages); + receiveMessagesWithoutCommit(consumer, numberOfMessages); + + final ManagedConnection mBean = getConnectionMBean(); + final CompositeDataSupport row = getTheOneChannelRow(mBean); + boolean flowBlocked = false; + assertChannelRowData(row, numberOfMessages, transactional, flowBlocked); + + // check that commit advances the lastIoTime + final Date initialLastIOTime = mBean.getLastIoTime(); + session.commit(); + assertTrue("commit should have caused last IO time to advance", mBean.getLastIoTime().after(initialLastIOTime)); + + // check that channels() now returns one session with no unacknowledged messages + final CompositeDataSupport rowAfterCommit = getTheOneChannelRow(mBean); + final Number unackCountAfterCommit = (Number) rowAfterCommit.get(ManagedConnection.UNACKED_COUNT); + assertEquals("Unexpected number of unacknowledged messages", 0, unackCountAfterCommit); + } + + + public void testProducerFlowBlocked() throws Exception + { + _connection = getConnection(); + _connection.start(); + + String queueName = getTestQueueName(); + Session session = _connection.createSession(true, Session.SESSION_TRANSACTED); + Queue queue = session.createQueue(queueName); + createQueueOnBroker(session, queue); + + ManagedQueue managedQueue = _jmxUtils.getManagedQueue(queueName); + managedQueue.setFlowResumeCapacity(DEFAULT_MESSAGE_SIZE * 2l); + managedQueue.setCapacity(DEFAULT_MESSAGE_SIZE * 3l); + + final ManagedConnection managedConnection = getConnectionMBean(); + + // Check that producer flow is not block before test + final CompositeDataSupport rowBeforeSend = getTheOneChannelRow(managedConnection); + assertFlowBlocked(rowBeforeSend, false); + + + // Check that producer flow does not become block too soon + sendMessage(session, queue, 3); + final CompositeDataSupport rowBeforeFull = getTheOneChannelRow(managedConnection); + assertFlowBlocked(rowBeforeFull, false); + + // Fourth message will over-fill the queue (but as we are not sending more messages, client thread wont't block) + sendMessage(session, queue, 1); + final CompositeDataSupport rowAfterFull = getTheOneChannelRow(managedConnection); + assertFlowBlocked(rowAfterFull, true); + + // Consume two to bring the queue down to the resume capacity + MessageConsumer consumer = session.createConsumer(queue); + assertNotNull("Could not receive first message", consumer.receive(1000)); + assertNotNull("Could not receive second message", consumer.receive(1000)); + session.commit(); + + // Check that producer flow is no longer blocked + final CompositeDataSupport rowAfterReceive = getTheOneChannelRow(managedConnection); + assertFlowBlocked(rowAfterReceive, false); + } + + private void createQueueOnBroker(Session session, Destination destination) throws JMSException + { + session.createConsumer(destination).close(); // Create a consumer only to cause queue creation + } + + private void assertChannelRowData(final CompositeData row, int unacknowledgedMessages, boolean isTransactional, boolean flowBlocked) + { + assertNotNull(row); + assertEquals("Unexpected transactional flag", isTransactional, row.get(ManagedConnection.TRANSACTIONAL)); + assertEquals("Unexpected unacknowledged message count", unacknowledgedMessages, row.get(ManagedConnection.UNACKED_COUNT)); + assertEquals("Unexpected flow blocked", flowBlocked, row.get(ManagedConnection.FLOW_BLOCKED)); + } + + private void assertFlowBlocked(final CompositeData row, boolean flowBlocked) + { + assertNotNull(row); + assertEquals("Unexpected flow blocked", flowBlocked, row.get(ManagedConnection.FLOW_BLOCKED)); + } + + private void checkAuthorisedId(ManagedConnection mBean) throws Exception + { + assertEquals("Unexpected authorized id", GUEST_USERNAME, mBean.getAuthorizedId()); + } + + private void checkClientVersion(ManagedConnection mBean) throws Exception + { + String expectedVersion = QpidProperties.getReleaseVersion(); + assertTrue(StringUtils.isNotBlank(expectedVersion)); + + assertEquals("Unexpected version", expectedVersion, mBean.getVersion()); + } + + private void checkClientId(ManagedConnection mBean) throws Exception + { + String expectedClientId = _connection.getClientID(); + assertTrue(StringUtils.isNotBlank(expectedClientId)); + + assertEquals("Unexpected ClientId", expectedClientId, mBean.getClientId()); + } + + private ManagedConnection getConnectionMBean() + { + List<ManagedConnection> connections = getManagedConnections(); + assertNotNull("Connection MBean is not found", connections); + assertEquals("Unexpected number of connection mbeans", 1, connections.size()); + final ManagedConnection mBean = connections.get(0); + assertNotNull("Connection MBean is null", mBean); + return mBean; + } + + private List<ManagedConnection> getManagedConnections() + { + return _jmxUtils.getManagedConnections(VIRTUAL_HOST_NAME); + } + + private CompositeDataSupport getTheOneChannelRow(final ManagedConnection mBean) throws Exception + { + TabularData channelsData = getChannelsDataWithRetry(mBean); + + assertEquals("Unexpected number of rows in channel table", 1, channelsData.size()); + + @SuppressWarnings("unchecked") + final Iterator<CompositeDataSupport> rowItr = (Iterator<CompositeDataSupport>) channelsData.values().iterator(); + final CompositeDataSupport row = rowItr.next(); + return row; + } + + private void receiveMessagesWithoutCommit(final MessageConsumer consumer, int numberOfMessages) throws Exception + { + for (int i = 0; i < numberOfMessages; i++) + { + final Message m = consumer.receive(1000l); + assertNotNull("Message " + i + " is not received", m); + } + } + + private TabularData getChannelsDataWithRetry(final ManagedConnection mBean) + throws IOException, JMException + { + TabularData channelsData = mBean.channels(); + int retries = 0; + while(channelsData.size() == 0 && retries < 5) + { + sleep(); + channelsData = mBean.channels(); + retries++; + } + return channelsData; + } + + private void sleep() + { + try + { + Thread.sleep(50); + } + catch (InterruptedException ie) + { + Thread.currentThread().interrupt(); + } + }} diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/systest/management/jmx/ExchangeManagementTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/systest/management/jmx/ExchangeManagementTest.java new file mode 100644 index 0000000000..8c0a11b7cc --- /dev/null +++ b/qpid/java/systests/src/test/java/org/apache/qpid/systest/management/jmx/ExchangeManagementTest.java @@ -0,0 +1,164 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.systest.management.jmx; + +import java.util.Collections; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; + +import org.apache.qpid.common.AMQPFilterTypes; +import org.apache.qpid.exchange.ExchangeDefaults; +import org.apache.qpid.management.common.mbeans.ManagedBroker; +import org.apache.qpid.management.common.mbeans.ManagedExchange; +import org.apache.qpid.server.model.Broker; +import org.apache.qpid.test.utils.JMXTestUtils; +import org.apache.qpid.test.utils.QpidBrokerTestCase; + +public class ExchangeManagementTest extends QpidBrokerTestCase +{ + private static final String MESSAGE_PROPERTY_INDEX = "index"; + private static final String MESSAGE_PROPERTY_TEST = "test"; + private static final String MESSAGE_PROPERTY_DUMMY = "dummy"; + private static final String SELECTOR_ARGUMENT = AMQPFilterTypes.JMS_SELECTOR.toString(); + private static final String SELECTOR = MESSAGE_PROPERTY_TEST + "='test'"; + private static final String VIRTUAL_HOST = "test"; + + private JMXTestUtils _jmxUtils; + private ManagedBroker _managedBroker; + private String _testQueueName; + private ManagedExchange _directExchange; + private ManagedExchange _topicExchange; + private ManagedExchange _fanoutExchange; + private ManagedExchange _headersExchange; + private Connection _connection; + private Session _session; + + public void setUp() throws Exception + { + getBrokerConfiguration().addJmxManagementConfiguration(); + + // to test exchange selectors the publishing of unroutable messages should be allowed + getBrokerConfiguration().setBrokerAttribute(Broker.CONNECTION_CLOSE_WHEN_NO_ROUTE, false); + + _jmxUtils = new JMXTestUtils(this); + + super.setUp(); + + _jmxUtils.open(); + + _managedBroker = _jmxUtils.getManagedBroker(VIRTUAL_HOST); + _testQueueName = getTestName(); + _managedBroker.createNewQueue(_testQueueName, null, true); + _directExchange = _jmxUtils.getManagedExchange(ExchangeDefaults.DIRECT_EXCHANGE_NAME); + _topicExchange = _jmxUtils.getManagedExchange(ExchangeDefaults.TOPIC_EXCHANGE_NAME); + _fanoutExchange = _jmxUtils.getManagedExchange(ExchangeDefaults.FANOUT_EXCHANGE_NAME); + _headersExchange = _jmxUtils.getManagedExchange(ExchangeDefaults.HEADERS_EXCHANGE_NAME); + + _connection = getConnection(); + _connection.start(); + _session = _connection.createSession(true, Session.SESSION_TRANSACTED); + } + + public void testCreateNewBindingWithArgumentsOnDirectExchange() throws Exception + { + String bindingKey = "test-direct-binding"; + + _directExchange.createNewBinding(_testQueueName, bindingKey, + Collections.<String, Object> singletonMap(SELECTOR_ARGUMENT, SELECTOR)); + + bindingTest(_session.createQueue(bindingKey)); + } + + public void testCreateNewBindingWithArgumentsOnTopicExchange() throws Exception + { + String bindingKey = "test-topic-binding"; + + _topicExchange.createNewBinding(_testQueueName, bindingKey, + Collections.<String, Object> singletonMap(SELECTOR_ARGUMENT, SELECTOR)); + + bindingTest(_session.createTopic(bindingKey)); + } + + public void testCreateNewBindingWithArgumentsOnFanoutExchange() throws Exception + { + _fanoutExchange.createNewBinding(_testQueueName, null, + Collections.<String, Object> singletonMap(SELECTOR_ARGUMENT, SELECTOR)); + + bindingTest(_session.createQueue("fanout://amq.fanout//?routingkey='routing-key-must-not-be-null'")); + } + + public void testCreateNewBindingWithArgumentsOnHeadersExchange() throws Exception + { + // headers exchange uses 'dummy' property to match messages + // i.e. all test messages have matching header value + _headersExchange.createNewBinding(_testQueueName, "x-match=any,dummy=test", + Collections.<String, Object> singletonMap(SELECTOR_ARGUMENT, SELECTOR)); + + bindingTest(_session.createQueue("headers://amq.match//?routingkey='routing-key-must-not-be-null'")); + } + + private void bindingTest(Destination destination) throws JMSException + { + publishMessages(destination, 4); + receiveAndAssertMessages(2); + } + + private void publishMessages(Destination destination, int messageNumber) throws JMSException + { + MessageProducer producer = _session.createProducer(destination); + + for (int i = 0; i < messageNumber; i++) + { + Message m = _session.createMessage(); + m.setStringProperty(MESSAGE_PROPERTY_TEST, i % 2 == 0 ? MESSAGE_PROPERTY_TEST : ""); + m.setIntProperty(MESSAGE_PROPERTY_INDEX, i); + m.setStringProperty(MESSAGE_PROPERTY_DUMMY, "test"); + producer.send(m); + } + _session.commit(); + } + + private void receiveAndAssertMessages(int messageNumber) throws JMSException + { + MessageConsumer consumer = _session.createConsumer(_session.createQueue(_testQueueName)); + + for (int i = 0; i < messageNumber; i++) + { + int index = i * 2; + Message message = consumer.receive(1000l); + assertNotNull("Expected message is not received at " + i, message); + assertEquals("Unexpected test property at " + i, MESSAGE_PROPERTY_TEST, + message.getStringProperty(MESSAGE_PROPERTY_TEST)); + assertEquals("Unexpected index property at " + i, index, message.getIntProperty(MESSAGE_PROPERTY_INDEX)); + } + + Message message = consumer.receive(1000l); + assertNull("Unexpected message received", message); + _session.commit(); + } + +} diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/systest/management/jmx/LoggingManagementTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/systest/management/jmx/LoggingManagementTest.java new file mode 100644 index 0000000000..3717c1594d --- /dev/null +++ b/qpid/java/systests/src/test/java/org/apache/qpid/systest/management/jmx/LoggingManagementTest.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.systest.management.jmx; + +import java.io.File; +import java.util.List; + +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.TabularData; + +import org.apache.qpid.management.common.mbeans.LoggingManagement; +import org.apache.qpid.server.logging.log4j.LoggingManagementFacadeTest; +import org.apache.qpid.test.utils.JMXTestUtils; +import org.apache.qpid.test.utils.QpidBrokerTestCase; +import org.apache.qpid.util.FileUtils; +import org.apache.qpid.util.LogMonitor; + +/** + * System test for Logging Management. <b>These tests rely on value set within + * test-profiles/log4j-test.xml</b>. + * + * @see LoggingManagementMBeanTest + * @see LoggingManagementFacadeTest + * + */ +public class LoggingManagementTest extends QpidBrokerTestCase +{ + private JMXTestUtils _jmxUtils; + private LoggingManagement _loggingManagement; + private LogMonitor _monitor; + + public void setUp() throws Exception + { + getBrokerConfiguration().addJmxManagementConfiguration(); + + _jmxUtils = new JMXTestUtils(this); + + // System test normally run with log for4j test config from beneath test-profiles. We need to + // copy it as some of our tests write to this file. + + File tmpLogFile = File.createTempFile("log4j" + "." + getName(), ".xml"); + tmpLogFile.deleteOnExit(); + FileUtils.copy(getBrokerCommandLog4JFile(), tmpLogFile); + setBrokerCommandLog4JFile(tmpLogFile); + + super.setUp(); + _jmxUtils.open(); + + _loggingManagement = _jmxUtils.getLoggingManagement(); + _monitor = new LogMonitor(_outputFile); + } + + public void tearDown() throws Exception + { + try + { + if (_jmxUtils != null) + { + _jmxUtils.close(); + } + } + finally + { + super.tearDown(); + } + } + + public void testViewEffectiveRuntimeLoggerLevels() throws Exception + { + final String qpidMainLogger = "org.apache.qpid"; + + TabularData table = _loggingManagement.viewEffectiveRuntimeLoggerLevels(); + final CompositeData row = table.get(new String[] {qpidMainLogger} ); + assertChannelRow(row, qpidMainLogger, "DEBUG"); + } + + public void testViewConfigFileLoggerLevels() throws Exception + { + final String operationalLoggingLogger = "qpid.message"; + + TabularData table = _loggingManagement.viewConfigFileLoggerLevels(); + final CompositeData row = table.get(new String[] {operationalLoggingLogger} ); + assertChannelRow(row, operationalLoggingLogger, "INFO"); + } + + public void testTurnOffOrgApacheQpidAtRuntime() throws Exception + { + final String logger = "org.apache.qpid"; + _monitor.markDiscardPoint(); + _loggingManagement.setRuntimeLoggerLevel(logger, "OFF"); + + List<String> matches = _monitor.waitAndFindMatches("Setting level to OFF for logger 'org.apache.qpid'", 5000); + assertEquals(1, matches.size()); + + TabularData table = _loggingManagement.viewEffectiveRuntimeLoggerLevels(); + final CompositeData row1 = table.get(new String[] {logger} ); + assertChannelRow(row1, logger, "OFF"); + } + + public void testChangesToConfigFileBecomeEffectiveAfterReload() throws Exception + { + final String operationalLoggingLogger = "qpid.message"; + assertEffectiveLoggingLevel(operationalLoggingLogger, "INFO"); + + _monitor.markDiscardPoint(); + _loggingManagement.setConfigFileLoggerLevel(operationalLoggingLogger, "OFF"); + + List<String> matches = _monitor.waitAndFindMatches("Setting level to OFF for logger 'qpid.message'", 5000); + assertEquals(1, matches.size()); + + assertEffectiveLoggingLevel(operationalLoggingLogger, "INFO"); + + _loggingManagement.reloadConfigFile(); + + assertEffectiveLoggingLevel(operationalLoggingLogger, "OFF"); + } + + private void assertEffectiveLoggingLevel(String operationalLoggingLogger, String expectedLevel) + { + TabularData table = _loggingManagement.viewEffectiveRuntimeLoggerLevels(); + final CompositeData row1 = table.get(new String[] {operationalLoggingLogger} ); + assertChannelRow(row1, operationalLoggingLogger, expectedLevel); + } + + private void assertChannelRow(final CompositeData row, String logger, String level) + { + assertNotNull("No row for " + logger, row); + assertEquals("Unexpected logger name", logger, row.get(LoggingManagement.LOGGER_NAME)); + assertEquals("Unexpected level", level, row.get(LoggingManagement.LOGGER_LEVEL)); + } + +} diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/systest/management/jmx/MBeanLifeCycleTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/systest/management/jmx/MBeanLifeCycleTest.java new file mode 100644 index 0000000000..71f911627e --- /dev/null +++ b/qpid/java/systests/src/test/java/org/apache/qpid/systest/management/jmx/MBeanLifeCycleTest.java @@ -0,0 +1,140 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.systest.management.jmx; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import javax.management.ObjectName; +import javax.servlet.http.HttpServletResponse; + +import org.apache.qpid.management.common.mbeans.ManagedBroker; +import org.apache.qpid.server.management.plugin.HttpManagement; +import org.apache.qpid.server.model.AuthenticationProvider; +import org.apache.qpid.server.model.Plugin; +import org.apache.qpid.server.model.Port; +import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.model.State; +import org.apache.qpid.server.model.VirtualHostNode; +import org.apache.qpid.server.security.auth.manager.AnonymousAuthenticationManager; +import org.apache.qpid.server.virtualhost.ProvidedStoreVirtualHostImpl; +import org.apache.qpid.server.virtualhostnode.memory.MemoryVirtualHostNode; +import org.apache.qpid.systest.rest.QpidRestTestCase; +import org.apache.qpid.test.utils.JMXTestUtils; +import org.apache.qpid.test.utils.TestBrokerConfiguration; + +public class MBeanLifeCycleTest extends QpidRestTestCase +{ + private final static String TEST_VIRTUAL_HOST_MBEAN_SEARCH_QUERY = "org.apache.qpid:type=VirtualHost.VirtualHostManager,VirtualHost=" + + ObjectName.quote(TEST2_VIRTUALHOST); + private JMXTestUtils _jmxUtils; + + @Override + public void setUp() throws Exception + { + super.setUp(); + _jmxUtils = new JMXTestUtils(this); + _jmxUtils.open(); + } + + @Override + protected void customizeConfiguration() throws IOException + { + TestBrokerConfiguration config = getBrokerConfiguration(); + config.addHttpManagementConfiguration(); + config.setObjectAttribute(Port.class, TestBrokerConfiguration.ENTRY_NAME_HTTP_PORT, Port.PORT, getRestTestHelper().getHttpPort()); + + Map<String, Object> anonymousProviderAttributes = new HashMap<String, Object>(); + anonymousProviderAttributes.put(AuthenticationProvider.TYPE, AnonymousAuthenticationManager.PROVIDER_TYPE); + anonymousProviderAttributes.put(AuthenticationProvider.NAME, ANONYMOUS_AUTHENTICATION_PROVIDER); + config.addObjectConfiguration(AuthenticationProvider.class, anonymousProviderAttributes); + + // set password authentication provider on http port for the tests + config.setObjectAttribute(Port.class, TestBrokerConfiguration.ENTRY_NAME_HTTP_PORT, Port.AUTHENTICATION_PROVIDER, + TestBrokerConfiguration.ENTRY_NAME_AUTHENTICATION_PROVIDER); + config.setObjectAttribute(Plugin.class, TestBrokerConfiguration.ENTRY_NAME_HTTP_MANAGEMENT, HttpManagement.HTTP_BASIC_AUTHENTICATION_ENABLED, true); + getBrokerConfiguration().addJmxManagementConfiguration(); + } + + @Override + public void tearDown() throws Exception + { + if (_jmxUtils != null) + { + _jmxUtils.close(); + } + super.tearDown(); + } + + public void testVirtualHostMBeanIsRegisteredOnVirtualHostCreation() throws Exception + { + String nodeName = "ntmp"; + String hostName = "htmp"; + + Map<String, Object> nodeData = new HashMap<>(); + nodeData.put(VirtualHostNode.NAME, nodeName); + nodeData.put(VirtualHostNode.TYPE, MemoryVirtualHostNode.VIRTUAL_HOST_NODE_TYPE); + getRestTestHelper().submitRequest("virtualhostnode/" + nodeName, "PUT", nodeData, HttpServletResponse.SC_CREATED); + + Map<String, Object> virtualhostData = new HashMap<>(); + virtualhostData.put(VirtualHost.NAME, nodeName); + virtualhostData.put(VirtualHost.TYPE, ProvidedStoreVirtualHostImpl.VIRTUAL_HOST_TYPE); + getRestTestHelper().submitRequest("virtualhost/" + nodeName + "/" + hostName, + "PUT", + virtualhostData, + HttpServletResponse.SC_CREATED); + + + ManagedBroker managedBroker = _jmxUtils.getManagedBroker(hostName); + assertNotNull("Host mBean is not created", managedBroker); + } + + public void testVirtualHostMBeanIsUnregisteredOnVirtualHostDeletion() throws Exception + { + boolean mBeanExists =_jmxUtils.doesManagedObjectExist(TEST_VIRTUAL_HOST_MBEAN_SEARCH_QUERY); + assertTrue("Host mBean is not registered", mBeanExists); + + getRestTestHelper().submitRequest("virtualhostnode/" + TEST2_VIRTUALHOST, "DELETE", HttpServletResponse.SC_OK); + + mBeanExists =_jmxUtils.doesManagedObjectExist(TEST_VIRTUAL_HOST_MBEAN_SEARCH_QUERY); + assertFalse("Host mBean is not unregistered", mBeanExists); + } + + public void testVirtualHostMBeanIsUnregisteredOnVirtualHostNodeStop() throws Exception + { + boolean mBeanExists =_jmxUtils.doesManagedObjectExist(TEST_VIRTUAL_HOST_MBEAN_SEARCH_QUERY); + assertTrue("Host mBean is not registered", mBeanExists); + + ManagedBroker managedBroker = _jmxUtils.getManagedBroker(TEST2_VIRTUALHOST); + assertNotNull("Host mBean is not created", managedBroker); + + Map<String, Object> nodeData = new HashMap<String, Object>(); + nodeData.put(VirtualHostNode.NAME, TEST2_VIRTUALHOST); + nodeData.put(VirtualHostNode.DESIRED_STATE, State.STOPPED.name()); + + int status = getRestTestHelper().submitRequest("virtualhostnode/" + TEST2_VIRTUALHOST, "PUT", nodeData); + assertEquals("Unexpected code", 200, status); + + mBeanExists =_jmxUtils.doesManagedObjectExist(TEST_VIRTUAL_HOST_MBEAN_SEARCH_QUERY); + assertFalse("Host mBean is not unregistered", mBeanExists); + } +} diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/systest/management/jmx/ManagementActorLoggingTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/systest/management/jmx/ManagementActorLoggingTest.java new file mode 100644 index 0000000000..4358b4b450 --- /dev/null +++ b/qpid/java/systests/src/test/java/org/apache/qpid/systest/management/jmx/ManagementActorLoggingTest.java @@ -0,0 +1,482 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.systest.management.jmx; + +import org.apache.qpid.management.common.mbeans.ManagedBroker; +import org.apache.qpid.management.common.mbeans.ManagedConnection; +import org.apache.qpid.management.common.mbeans.ManagedExchange; +import org.apache.qpid.server.logging.AbstractTestLogging; +import org.apache.qpid.server.logging.subjects.AbstractTestLogSubject; +import org.apache.qpid.test.utils.JMXTestUtils; + +import javax.jms.Connection; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.management.JMException; +import java.io.IOException; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +/** + * Test class to test if any change in the broker JMX code is affesting the management console + * There are some hardcoding of management feature names and parameter names to create a customized + * look in the console. + */ +public class ManagementActorLoggingTest extends AbstractTestLogging +{ + private JMXTestUtils _jmxUtils; + private boolean _closed = false; + + @Override + public void setUp() throws Exception + { + getBrokerConfiguration().addJmxManagementConfiguration(); + + _jmxUtils = new JMXTestUtils(this); + + super.setUp(); + _jmxUtils.open(); + } + + @Override + public void tearDown() throws Exception + { + if(!_closed) + { + _jmxUtils.close(); + } + super.tearDown(); + } + + /** + * Description: + * When a connected client has its connection closed via the Management Console this will be logged as a CON-1002 message. + * Input: + * + * 1. Running Broker + * 2. Connected Client + * 3. Connection is closed via Management Console + * Output: + * + * <date> CON-1002 : Close + * + * Validation Steps: + * 4. The CON ID is correct + * 5. This must be the last CON message for the Connection + * 6. It must be preceded by a CON-1001 for this Connection + * + * @throws Exception - {@see ManagedConnection.closeConnection and #getConnection} + * @throws java.io.IOException - if there is a problem reseting the log monitor + */ + public void testConnectionCloseViaManagement() throws IOException, Exception + { + //Create a connection to the broker + Connection connection = getConnection(); + + // Monitor the connection for an exception being thrown + // this should be a DisconnectionException but it is not this tests + // job to valiate that. Only use the exception as a synchronisation + // to check the log file for the Close message + final CountDownLatch exceptionReceived = new CountDownLatch(1); + connection.setExceptionListener(new ExceptionListener() + { + public void onException(JMSException e) + { + //Failover being attempted. + exceptionReceived.countDown(); + } + }); + + //Remove the connection close from any 0-10 connections + _monitor.markDiscardPoint(); + + // Get a managedConnection + ManagedConnection mangedConnection = _jmxUtils.getManagedObject(ManagedConnection.class, "org.apache.qpid:type=VirtualHost.Connection,*"); + + //Close the connection + mangedConnection.closeConnection(); + + //Wait for the connection to close + assertTrue("Timed out waiting for conneciton to report close", + exceptionReceived.await(2, TimeUnit.SECONDS)); + + //Validate results + List<String> results = waitAndFindMatches("CON-1002"); + + assertEquals("Unexpected Connection Close count", 1, results.size()); + } + + /** + * Description: + * Exchange creation is possible from the Management Console. + * When an exchanged is created in this way then a EXH-1001 create message + * is expected to be logged. + * Input: + * + * 1. Running broker + * 2. Connected Management Console + * 3. Exchange Created via Management Console + * Output: + * + * EXH-1001 : Create : [Durable] Type:<value> Name:<value> + * + * Validation Steps: + * 4. The EXH ID is correct + * 5. The correct tags are present in the message based on the create options + * + * @throws java.io.IOException - if there is a problem reseting the log monitor + * @throws javax.management.JMException - {@see #createQueue and ManagedExchange.deleteQueue} + */ + public void testCreateExchangeDirectTransientViaManagementConsole() throws IOException, JMException + { + _monitor.markDiscardPoint(); + + _jmxUtils.createExchange("test", getName(), "direct", false); + + // Validate + + //1 - ID is correct + List<String> results = waitAndFindMatches("EXH-1001"); + + assertEquals("More than one exchange creation found", 1, results.size()); + + String log = getLogMessage(results, 0); + + // Validate correct exchange name + assertTrue("Incorrect exchange name created:" + log, log.endsWith(getName())); + + // Validate it was a management actor. + String actor = fromActor(log); + assertTrue("Actor is not a manangement actor:" + actor, actor.startsWith("mng")); + } + + public void testCreateExchangeTopicTransientViaManagementConsole() throws IOException, JMException + { + //Remove any previous exchange declares + _monitor.markDiscardPoint(); + + _jmxUtils.createExchange("test", getName(), "topic", false); + + // Validate + + //1 - ID is correct + List<String> results = waitAndFindMatches("EXH-1001"); + + assertEquals("More than one exchange creation found", 1, results.size()); + + String log = getLogMessage(results, 0); + + // Validate correct exchange name + assertTrue("Incorrect exchange name created:" + log, log.endsWith(getName())); + + // Validate it was a management actor. + String actor = fromActor(log); + assertTrue("Actor is not a manangement actor:" + actor, actor.startsWith("mng")); + + } + + public void testCreateExchangeFanoutTransientViaManagementConsole() throws IOException, JMException + { + //Remove any previous exchange declares + _monitor.markDiscardPoint(); + + _jmxUtils.createExchange("test", getName(), "fanout", false); + + // Validate + + //1 - ID is correct + List<String> results = waitAndFindMatches("EXH-1001"); + + assertEquals("More than one exchange creation found", 1, results.size()); + + String log = getLogMessage(results, 0); + + // Validate correct exchange name + assertTrue("Incorrect exchange name created:" + log, log.endsWith(getName())); + + // Validate it was a management actor. + String actor = fromActor(log); + assertTrue("Actor is not a manangement actor:" + actor, actor.startsWith("mng")); + + } + + public void testCreateExchangeHeadersTransientViaManagementConsole() throws IOException, JMException + { + //Remove any previous exchange declares + _monitor.markDiscardPoint(); + + _jmxUtils.createExchange("test", getName(), "headers", false); + + // Validate + + //1 - ID is correct + List<String> results = waitAndFindMatches("EXH-1001"); + + assertEquals("More than one exchange creation found", 1, results.size()); + + String log = getLogMessage(results, 0); + + // Validate correct exchange name + assertTrue("Incorrect exchange name created:" + log, log.endsWith(getName())); + + // Validate it was a management actor. + String actor = fromActor(log); + assertTrue("Actor is not a manangement actor:" + actor, actor.startsWith("mng")); + + } + + /** + * Description: + * Queue creation is possible from the Management Console. When a queue is created in this way then a QUE-1001 create message is expected to be logged. + * Input: + * + * 1. Running broker + * 2. Connected Management Console + * 3. Queue Created via Management Console + * Output: + * + * <date> QUE-1001 : Create : Transient Owner:<name> + * + * Validation Steps: + * 4. The QUE ID is correct + * 5. The correct tags are present in the message based on the create options + * + * @throws java.io.IOException - if there is a problem reseting the log monitor + * @throws javax.management.JMException - {@see #createQueue and ManagedExchange.deleteQueue} + */ + public void testCreateQueueTransientViaManagementConsole() throws IOException, JMException + { + //Remove any previous queue declares + _monitor.markDiscardPoint(); + + _jmxUtils.createQueue("test", getName(), null, false); + + // Validate + + List<String> results = waitAndFindMatches("QUE-1001"); + + assertEquals("More than one queue creation found", 1, results.size()); + + String log = getLogMessage(results, 0); + + // Validate correct queue name + String subject = fromSubject(log); + assertEquals("Incorrect queue name created", getName(), AbstractTestLogSubject.getSlice("qu", subject)); + + // Validate it was a management actor. + String actor = fromActor(log); + assertTrue("Actor is not a manangement actor:" + actor, actor.startsWith("mng")); + } + + /** + * Description: + * The ManagementConsole can be used to delete a queue. When this is done a QUE-1002 Deleted message must be logged. + * Input: + * + * 1. Running Broker + * 2. Queue created on the broker with no subscribers + * 3. Management Console connected + * 4. Queue is deleted via Management Console + * Output: + * + * <date> QUE-1002 : Deleted + * + * Validation Steps: + * 5. The QUE ID is correct + * + * @throws java.io.IOException - if there is a problem reseting the log monitor + * @throws javax.management.JMException - {@see #createQueue and ManagedExchange.deleteQueue} + */ + public void testQueueDeleteViaManagementConsole() throws IOException, JMException + { + //Remove any previous queue declares + _monitor.markDiscardPoint(); + + _jmxUtils.createQueue("test", getName(), null, false); + + ManagedBroker managedBroker = _jmxUtils.getManagedBroker("test"); + + managedBroker.deleteQueue(getName()); + + List<String> results = waitAndFindMatches("QUE-1002"); + + assertEquals("More than one queue deletion found", 1, results.size()); + + String log = getLog(results.get(0)); + + // Validate correct binding + String subject = fromSubject(log); + assertEquals("Incorrect queue named in delete", getName(), AbstractTestLogSubject.getSlice("qu", subject)); + + // Validate it was a management actor. + String actor = fromActor(log); + assertTrue("Actor is not a manangement actor:" + actor, actor.startsWith("mng")); + + } + + /** + * Description: + * The binding of a Queue and an Exchange is done via a Binding. When this Binding is created via the Management Console a BND-1001 Create message will be logged. + * Input: + * + * 1. Running Broker + * 2. Connected Management Console + * 3. Use Management Console to perform binding + * Output: + * + * <date> BND-1001 : Create + * + * Validation Steps: + * 4. The BND ID is correct + * 5. This will be the first message for the given binding + * + * @throws java.io.IOException - if there is a problem reseting the log monitor + * @throws javax.management.JMException - {@see #createQueue and ManagedExchange.createNewBinding} + */ + public void testBindingCreateOnDirectViaManagementConsole() throws IOException, JMException + { + //Remove any previous queue declares + _monitor.markDiscardPoint(); + + _jmxUtils.createQueue("test", getName(), null, false); + + ManagedExchange managedExchange = _jmxUtils.getManagedExchange("amq.direct"); + + managedExchange.createNewBinding(getName(), getName()); + + List<String> results = waitAndFindMatches("BND-1001"); + + assertEquals("Unexpected number of bindings logged", 1, results.size()); + + String log = getLogMessage(results, 0); + + // Validate correct binding + String subject = fromSubject(log); + assertEquals("Incorrect queue named in create", getName(), AbstractTestLogSubject.getSlice("qu", subject)); + assertEquals("Incorrect routing key in create", getName(), AbstractTestLogSubject.getSlice("rk", subject)); + + // Validate it was a management actor. + String actor = fromActor(log); + assertTrue("Actor is not a manangement actor:" + actor, actor.startsWith("mng")); + } + + public void testBindingCreateOnTopicViaManagementConsole() throws IOException, JMException + { + //Remove any previous queue declares + _monitor.markDiscardPoint(); + + _jmxUtils.createQueue("test", getName(), null, false); + + ManagedExchange managedExchange = _jmxUtils.getManagedExchange("amq.topic"); + + managedExchange.createNewBinding(getName(), getName()); + + List<String> results = waitAndFindMatches("BND-1001"); + + assertEquals("Unexpected number of bindings logged", 1, results.size()); + + String log = getLogMessage(results, 0); + + // Validate correct binding + String subject = fromSubject(log); + assertEquals("Incorrect queue named in create", getName(), AbstractTestLogSubject.getSlice("qu", subject)); + assertEquals("Incorrect routing key in create", getName(), AbstractTestLogSubject.getSlice("rk", subject)); + + // Validate it was a management actor. + String actor = fromActor(log); + assertTrue("Actor is not a manangement actor:" + actor, actor.startsWith("mng")); + } + + public void testBindingCreateOnFanoutViaManagementConsole() throws IOException, JMException + { + //Remove any previous queue declares + _monitor.markDiscardPoint(); + + _jmxUtils.createQueue("test", getName(), null, false); + + ManagedExchange managedExchange = _jmxUtils.getManagedExchange("amq.fanout"); + + managedExchange.createNewBinding(getName(), getName()); + + List<String> results = waitAndFindMatches("BND-1001"); + + assertEquals("Unexpected number of bindings logged", 1, results.size()); + + String log = getLogMessage(results, 0); + + // Validate correct binding + String subject = fromSubject(log); + assertEquals("Incorrect queue named in create", getName(), AbstractTestLogSubject.getSlice("qu", subject)); + assertEquals("Incorrect routing key in create", getName(), AbstractTestLogSubject.getSlice("rk", subject)); + + // Validate it was a management actor. + String actor = fromActor(log); + assertTrue("Actor is not a manangement actor:" + actor, actor.startsWith("mng")); + } + + /** + * Description: + * Bindings can be deleted so that a queue can be rebound with a different set of values. This can be performed via the Management Console + * Input: + * + * 1. Running Broker + * 2. Management Console connected + * 3. Management Console is used to perform unbind. + * Output: + * + * <date> BND-1002 : Deleted + * + * Validation Steps: + * 4. The BND ID is correct + * 5. There must have been a BND-1001 Create message first. + * 6. This will be the last message for the given binding + * + * @throws java.io.IOException - if there is a problem reseting the log monitor or an issue with the JMX Connection + * @throws javax.management.JMException - {@see #createExchange and ManagedBroker.unregisterExchange} + */ + public void testUnRegisterExchangeViaManagementConsole() throws IOException, JMException + { + //Remove any previous queue declares + _monitor.markDiscardPoint(); + + _jmxUtils.createExchange("test", getName(), "direct", false); + + ManagedBroker managedBroker = _jmxUtils.getManagedBroker("test"); + + managedBroker.unregisterExchange(getName()); + + List<String> results = waitAndFindMatches("EXH-1002"); + + assertEquals("More than one exchange deletion found", 1, results.size()); + + String log = getLog(results.get(0)); + + // Validate correct binding + String subject = fromSubject(log); + assertEquals("Incorrect exchange named in delete", "direct/" + getName(), AbstractTestLogSubject.getSlice("ex", subject)); + + // Validate it was a management actor. + String actor = fromActor(log); + assertTrue("Actor is not a manangement actor:" + actor, actor.startsWith("mng")); + } + +} diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/systest/management/jmx/ManagementLoggingTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/systest/management/jmx/ManagementLoggingTest.java new file mode 100644 index 0000000000..cb6eae013e --- /dev/null +++ b/qpid/java/systests/src/test/java/org/apache/qpid/systest/management/jmx/ManagementLoggingTest.java @@ -0,0 +1,324 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.systest.management.jmx; + + +import java.util.Collections; +import java.util.List; + +import org.apache.qpid.server.logging.AbstractTestLogging; +import org.apache.qpid.server.model.Port; +import org.apache.qpid.server.model.Transport; +import org.apache.qpid.test.utils.JMXTestUtils; +import org.apache.qpid.test.utils.TestBrokerConfiguration; +import org.apache.qpid.util.LogMonitor; + +/** + * Management Console Test Suite + * + * The Management Console test suite validates that the follow log messages as specified in the Functional Specification. + * + * This suite of tests validate that the management console messages occur correctly and according to the following format: + * + * MNG-1001 : <type> Management Startup + * MNG-1002 : Starting : <service> : Listening on port <Port> + * MNG-1003 : Shutting down : <service> : port <Port> + * MNG-1004 : <type> Management Ready + * MNG-1005 : <type> Management Stopped + * MNG-1006 : Using SSL Keystore : <path> + * MNG-1007 : Open : User <username> + * MNG-1008 : Close : User <username> + */ +public class ManagementLoggingTest extends AbstractTestLogging +{ + private static final String MNG_PREFIX = "MNG-"; + + public void setUp() throws Exception + { + setLogMessagePrefix(); + + // We either do this here or have a null check in tearDown. + // As when this test is run against profiles other than java it will NPE + _monitor = new LogMonitor(_outputFile); + //We explicitly do not call super.setUp as starting up the broker is + //part of the test case. + + } + + /** + * Description: + * Using the startup configuration validate that the management startup + * message is logged correctly. + * Input: + * Standard configuration with management enabled + * Output: + * + * <date> MNG-1001 : Startup + * + * Constraints: + * This is the FIRST message logged by MNG + * Validation Steps: + * + * 1. The BRK ID is correct + * 2. This is the FIRST message logged by MNG + */ + public void testManagementStartupEnabled() throws Exception + { + // This test only works on java brokers + if (isJavaBroker()) + { + startBrokerAndCreateMonitor(true, false); + + // Ensure we have received the MNG log msg. + waitForMessage("MNG-1001"); + + List<String> results = findMatches(MNG_PREFIX); + // Validation + + assertTrue("MNGer message not logged", results.size() > 0); + + String log = getLogMessage(results, 0); + + //1 + validateMessageID("MNG-1001", log); + + //2 + //There will be 2 copies of the startup message (one via SystemOut, and one via Log4J) + results = findMatches("MNG-1001"); + assertEquals("Unexpected startup message count.", + 2, results.size()); + + //3 + assertEquals("Startup log message is not 'Startup'.", "JMX Management Startup", + getMessageString(log)); + } + } + + /** + * Description: + * Verify that when management is disabled in the configuration file the + * startup message is not logged. + * Input: + * Standard configuration with management disabled + * Output: + * NO MNG messages + * Validation Steps: + * + * 1. Validate that no MNG messages are produced. + */ + public void testManagementStartupDisabled() throws Exception + { + if (isJavaBroker()) + { + startBrokerAndCreateMonitor(false, false); + + List<String> results = findMatches(MNG_PREFIX); + // Validation + + assertEquals("MNGer messages logged", 0, results.size()); + } + } + + /** + * The two MNG-1002 messages are logged at the same time so lets test them + * at the same time. + * + * Description: + * Using the default configuration validate that the RMI Registry socket is + * correctly reported as being opened + * + * Input: + * The default configuration file + * Output: + * + * <date> MESSAGE MNG-1002 : Starting : RMI Registry : Listening on port 8999 + * + * Constraints: + * The RMI ConnectorServer and Registry log messages do not have a prescribed order + * Validation Steps: + * + * 1. The MNG ID is correct + * 2. The specified port is the correct '8999' + * + * Description: + * Using the default configuration validate that the RMI ConnectorServer + * socket is correctly reported as being opened + * + * Input: + * The default configuration file + * Output: + * + * <date> MESSAGE MNG-1002 : Starting : RMI ConnectorServer : Listening on port 9099 + * + * Constraints: + * The RMI ConnectorServer and Registry log messages do not have a prescribed order + * Validation Steps: + * + * 1. The MNG ID is correct + * 2. The specified port is the correct '9099' + */ + public void testManagementStartupRMIEntries() throws Exception + { + if (isJavaBroker()) + { + startBrokerAndCreateMonitor(true, false); + + List<String> results = waitAndFindMatches("MNG-1002"); + // Validation + + //There will be 4 startup messages (two via SystemOut, and two via Log4J) + assertEquals("Unexpected MNG-1002 message count", 4, results.size()); + + String log = getLogMessage(results, 0); + + //1 + validateMessageID("MNG-1002", log); + + //Check the RMI Registry port is as expected + int mPort = getManagementPort(getPort()); + assertTrue("RMI Registry port not as expected(" + mPort + ").:" + getMessageString(log), + getMessageString(log).endsWith(String.valueOf(mPort))); + + log = getLogMessage(results, 2); + + //1 + validateMessageID("MNG-1002", log); + + // We expect the RMI Registry port (the defined 'management port') to be + // 100 lower than the JMX RMIConnector Server Port (the actual JMX server) + int jmxPort = mPort + JMXPORT_CONNECTORSERVER_OFFSET; + assertTrue("JMX RMIConnectorServer port not as expected(" + jmxPort + ").:" + getMessageString(log), + getMessageString(log).endsWith(String.valueOf(jmxPort))); + } + } + + /** + * Description: + * Using the default configuration with SSL enabled for the management port the SSL Keystore path should be reported via MNG-1006 + * Input: + * Management SSL enabled default configuration. + * Output: + * + * <date> MESSAGE MNG-1006 : Using SSL Keystore : test_resources/ssl/keystore.jks + * + * Validation Steps: + * + * 1. The MNG ID is correct + * 2. The keystore path is as specified in the configuration + */ + public void testManagementStartupSSLKeystore() throws Exception + { + if (isJavaBroker()) + { + setSystemProperty("javax.net.debug", "ssl"); + startBrokerAndCreateMonitor(true, true); + + List<String> results = waitAndFindMatches("MNG-1006"); + + assertTrue("MNGer message not logged", results.size() > 0); + + String log = getLogMessage(results, 0); + + //1 + validateMessageID("MNG-1006", log); + + // Validate we only have two MNG-1002 (one via stdout, one via log4j) + results = findMatches("MNG-1006"); + assertEquals("Upexpected SSL Keystore message count", + 2, results.size()); + + // Validate the keystore path is as expected + assertTrue("SSL Keystore entry expected.:" + getMessageString(log), + getMessageString(log).endsWith("systestsKeyStore")); + } + } + + /** + * Description: Tests the management connection open/close are logged correctly. + * + * Output: + * + * <date> MESSAGE MNG-1007 : Open : User <username> + * <date> MESSAGE MNG-1008 : Close : User <username> + * + * Validation Steps: + * + * 1. The MNG ID is correct + * 2. The message and username are correct + */ + public void testManagementUserOpenClose() throws Exception + { + if (isJavaBroker()) + { + startBrokerAndCreateMonitor(true, false); + + final JMXTestUtils jmxUtils = new JMXTestUtils(this); + List<String> openResults = null; + List<String> closeResults = null; + try + { + jmxUtils.open(); + openResults = waitAndFindMatches("MNG-1007"); + } + finally + { + if (jmxUtils != null) + { + jmxUtils.close(); + closeResults = waitAndFindMatches("MNG-1008"); + } + } + + assertNotNull("Management Open results null", openResults.size()); + assertEquals("Management Open logged unexpected number of times", 1, openResults.size()); + + assertNotNull("Management Close results null", closeResults.size()); + assertEquals("Management Close logged unexpected number of times", 1, closeResults.size()); + + final String openMessage = getMessageString(getLogMessage(openResults, 0)); + assertTrue("Unexpected open message " + openMessage, openMessage.endsWith("Open : User admin")); + final String closeMessage = getMessageString(getLogMessage(closeResults, 0)); + assertTrue("Unexpected close message " + closeMessage, closeMessage.endsWith("Close : User admin")); + } + } + + private void startBrokerAndCreateMonitor(boolean managementEnabled, boolean useManagementSSL) throws Exception + { + TestBrokerConfiguration config = getBrokerConfiguration(); + + if (managementEnabled) + { + config.addJmxManagementConfiguration(); + } + + if(useManagementSSL) + { + // This test requires we have ssl, change the transport and add they keystore to the port config + config.setObjectAttribute(Port.class, TestBrokerConfiguration.ENTRY_NAME_JMX_PORT, Port.TRANSPORTS, Collections.singleton(Transport.SSL)); + config.setObjectAttribute(Port.class, TestBrokerConfiguration.ENTRY_NAME_JMX_PORT, Port.KEY_STORE, TestBrokerConfiguration.ENTRY_NAME_SSL_KEYSTORE); + } + + startBroker(); + + // Now we can create the monitor as _outputFile will now be defined + _monitor = new LogMonitor(_outputFile); + } +} diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java new file mode 100644 index 0000000000..d0f133aa73 --- /dev/null +++ b/qpid/java/systests/src/test/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java @@ -0,0 +1,869 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.systest.management.jmx; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.management.Notification; +import javax.management.NotificationListener; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.TabularData; +import javax.naming.NamingException; + +import org.apache.commons.lang.time.FastDateFormat; +import org.apache.log4j.Logger; + +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.configuration.ClientProperties; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.management.common.mbeans.ManagedBroker; +import org.apache.qpid.management.common.mbeans.ManagedQueue; +import org.apache.qpid.server.queue.NotificationCheckTest; +import org.apache.qpid.server.queue.QueueArgumentsConverter; +import org.apache.qpid.server.queue.StandardQueueImpl; +import org.apache.qpid.test.client.destination.AddressBasedDestinationTest; +import org.apache.qpid.test.utils.JMXTestUtils; +import org.apache.qpid.test.utils.QpidBrokerTestCase; + +/** + * Tests the JMX API for the Managed Queue. + * + */ +public class QueueManagementTest extends QpidBrokerTestCase +{ + + private static final Logger LOGGER = Logger.getLogger(QueueManagementTest.class); + + private static final String VIRTUAL_HOST = "test"; + private static final String TEST_QUEUE_DESCRIPTION = "my description"; + + private JMXTestUtils _jmxUtils; + private Connection _connection; + private Session _session; + + private String _sourceQueueName; + private String _destinationQueueName; + private Destination _sourceQueue; + private Destination _destinationQueue; + private ManagedQueue _managedSourceQueue; + private ManagedQueue _managedDestinationQueue; + + public void setUp() throws Exception + { + getBrokerConfiguration().addJmxManagementConfiguration(); + + _jmxUtils = new JMXTestUtils(this); + + super.setUp(); + _sourceQueueName = getTestQueueName() + "_src"; + _destinationQueueName = getTestQueueName() + "_dest"; + + createConnectionAndSession(); + + _sourceQueue = _session.createQueue(_sourceQueueName); + _destinationQueue = _session.createQueue(_destinationQueueName); + createQueueOnBroker(_sourceQueue); + createQueueOnBroker(_destinationQueue); + + _jmxUtils.open(); + + createManagementInterfacesForQueues(); + } + + public void tearDown() throws Exception + { + if (_jmxUtils != null) + { + _jmxUtils.close(); + } + super.tearDown(); + } + + public void testQueueAttributes() throws Exception + { + Queue queue = _session.createQueue(getTestQueueName()); + createQueueOnBroker(queue); + + final String queueName = queue.getQueueName(); + + final ManagedQueue managedQueue = _jmxUtils.getManagedQueue(queueName); + assertEquals("Unexpected name", queueName, managedQueue.getName()); + assertEquals("Unexpected queue type", "standard", managedQueue.getQueueType()); + } + + public void testExclusiveQueueHasJmsClientIdAsOwner() throws Exception + { + final String subName = "testOwner"; + _session.createDurableSubscriber(getTestTopic(), subName); + + final String queueName = _connection.getClientID() + ":" + subName; + + final ManagedQueue managedQueue = _jmxUtils.getManagedQueue(queueName); + assertNotNull(_connection.getClientID()); + assertEquals("Unexpected owner", _connection.getClientID(), managedQueue.getOwner()); + } + + public void testNonExclusiveQueueHasNoOwner() throws Exception + { + Queue nonExclusiveQueue = _session.createQueue(getTestQueueName()); + createQueueOnBroker(nonExclusiveQueue); + + final String queueName = nonExclusiveQueue.getQueueName(); + + final ManagedQueue managedQueue = _jmxUtils.getManagedQueue(queueName); + assertNull("Unexpected owner", managedQueue.getOwner()); + } + + public void testSetNewQueueDescriptionOnExistingQueue() throws Exception + { + Queue queue = _session.createQueue(getTestQueueName()); + createQueueOnBroker(queue); + + final String queueName = queue.getQueueName(); + + final ManagedQueue managedQueue = _jmxUtils.getManagedQueue(queueName); + assertNull("Unexpected description", managedQueue.getDescription()); + + managedQueue.setDescription(TEST_QUEUE_DESCRIPTION); + assertEquals(TEST_QUEUE_DESCRIPTION, managedQueue.getDescription()); + } + + public void testNewQueueWithDescription() throws Exception + { + String queueName = getTestQueueName(); + Map<String, Object> arguments = Collections.singletonMap(QueueArgumentsConverter.X_QPID_DESCRIPTION, (Object)TEST_QUEUE_DESCRIPTION); + ((AMQSession<?, ?>)_session).createQueue(AMQShortString.valueOf(queueName), false, true, false, arguments); + + final ManagedQueue managedQueue = _jmxUtils.getManagedQueue(queueName); + assertEquals(TEST_QUEUE_DESCRIPTION, managedQueue.getDescription()); + } + + /** + * Requires persistent store. + */ + public void testQueueDescriptionSurvivesRestart() throws Exception + { + String queueName = getTestQueueName(); + Map<String, Object> arguments = Collections.singletonMap(QueueArgumentsConverter.X_QPID_DESCRIPTION, (Object)TEST_QUEUE_DESCRIPTION); + + ((AMQSession<?, ?>)_session).createQueue(AMQShortString.valueOf(queueName), false, true, false, arguments); + + ManagedQueue managedQueue = _jmxUtils.getManagedQueue(queueName); + assertEquals(TEST_QUEUE_DESCRIPTION, managedQueue.getDescription()); + + restartBroker(); + + managedQueue = _jmxUtils.getManagedQueue(queueName); + assertEquals(TEST_QUEUE_DESCRIPTION, managedQueue.getDescription()); + } + + /** + * Tests queue creation with {@link QueueArgumentsConverter#X_QPID_MAXIMUM_DELIVERY_COUNT} argument. Also tests + * that the attribute is exposed correctly through {@link ManagedQueue#getMaximumDeliveryCount()}. + */ + public void testCreateQueueWithMaximumDeliveryCountSet() throws Exception + { + final String queueName = getName(); + final ManagedBroker managedBroker = _jmxUtils.getManagedBroker(VIRTUAL_HOST); + + final Integer deliveryCount = 1; + final Map<String, Object> arguments = Collections.singletonMap(QueueArgumentsConverter.X_QPID_MAXIMUM_DELIVERY_COUNT, (Object)deliveryCount); + managedBroker.createNewQueue(queueName, null, true, arguments); + + // Ensure the queue exists + assertNotNull("Queue object name expected to exist", _jmxUtils.getQueueObjectName(VIRTUAL_HOST, queueName)); + assertNotNull("Manager queue expected to be available", _jmxUtils.getManagedQueue(queueName)); + + final ManagedQueue managedQueue = _jmxUtils.getManagedQueue(queueName); + assertEquals("Unexpected maximum delivery count", deliveryCount, managedQueue.getMaximumDeliveryCount()); + } + + public void testCreateQueueWithAlertingThresholdsSet() throws Exception + { + final String queueName = getName(); + final ManagedBroker managedBroker = _jmxUtils.getManagedBroker(VIRTUAL_HOST); + + final Long maximumMessageCount = 100l; + final Long maximumMessageSize = 200l; + final Long maximumQueueDepth = 300l; + final Long maximumMessageAge = 400l; + final Map<String, Object> arguments = new HashMap<String, Object>(); + arguments.put(QueueArgumentsConverter.X_QPID_MAXIMUM_MESSAGE_COUNT, maximumMessageCount); + arguments.put(QueueArgumentsConverter.X_QPID_MAXIMUM_MESSAGE_SIZE, maximumMessageSize); + arguments.put(QueueArgumentsConverter.X_QPID_MAXIMUM_QUEUE_DEPTH, maximumQueueDepth); + arguments.put(QueueArgumentsConverter.X_QPID_MAXIMUM_MESSAGE_AGE, maximumMessageAge); + + managedBroker.createNewQueue(queueName, null, true, arguments); + + // Ensure the queue exists + assertNotNull("Queue object name expected to exist", _jmxUtils.getQueueObjectName(VIRTUAL_HOST, queueName)); + assertNotNull("Manager queue expected to be available", _jmxUtils.getManagedQueue(queueName)); + + ManagedQueue managedQueue = _jmxUtils.getManagedQueue(queueName); + assertEquals("Unexpected maximum message count", maximumMessageCount, managedQueue.getMaximumMessageCount()); + assertEquals("Unexpected maximum message size", maximumMessageSize, managedQueue.getMaximumMessageSize()); + assertEquals("Unexpected maximum queue depth", maximumQueueDepth, managedQueue.getMaximumQueueDepth()); + assertEquals("Unexpected maximum message age", maximumMessageAge, managedQueue.getMaximumMessageAge()); + } + + /** + * Requires 0-10 as relies on ADDR addresses. + * @see AddressBasedDestinationTest for the testing of message routing to the alternate exchange + */ + public void testGetSetAlternateExchange() throws Exception + { + String queueName = getTestQueueName(); + String altExchange = "amq.fanout"; + String addrWithAltExch = String.format("ADDR:%s;{create:always,node:{type:queue,x-declare:{alternate-exchange:'%s'}}}", queueName, altExchange); + Queue queue = _session.createQueue(addrWithAltExch); + + createQueueOnBroker(queue); + + final ManagedQueue managedQueue = _jmxUtils.getManagedQueue(queueName); + assertEquals("Newly created queue does not have expected alternate exchange", altExchange, managedQueue.getAlternateExchange()); + + String newAltExch = "amq.topic"; + managedQueue.setAlternateExchange(newAltExch); + assertEquals("Unexpected alternate exchange after set", newAltExch, managedQueue.getAlternateExchange()); + } + + /** + * Requires 0-10 as relies on ADDR addresses. + */ + public void testRemoveAlternateExchange() throws Exception + { + String queueName = getTestQueueName(); + String altExchange = "amq.fanout"; + String addrWithAltExch = String.format("ADDR:%s;{create:always,node:{type:queue,x-declare:{alternate-exchange:'%s'}}}", queueName, altExchange); + Queue queue = _session.createQueue(addrWithAltExch); + + createQueueOnBroker(queue); + + final ManagedQueue managedQueue = _jmxUtils.getManagedQueue(queueName); + assertEquals("Newly created queue does not have expected alternate exchange", altExchange, managedQueue.getAlternateExchange()); + + managedQueue.setAlternateExchange(""); + assertNull("Unexpected alternate exchange after set", managedQueue.getAlternateExchange()); + } + + /** + * Requires persistent store + * Requires 0-10 as relies on ADDR addresses. + */ + public void testAlternateExchangeSurvivesRestart() throws Exception + { + String nonMandatoryExchangeName = "exch" + getName(); + + final ManagedBroker managedBroker = _jmxUtils.getManagedBroker(VIRTUAL_HOST); + managedBroker.createNewExchange(nonMandatoryExchangeName, "fanout", true); + + String queueName1 = getTestQueueName() + "1"; + String altExchange1 = "amq.fanout"; + String addr1WithAltExch = String.format("ADDR:%s;{create:always,node:{durable: true,type:queue,x-declare:{alternate-exchange:'%s'}}}", queueName1, altExchange1); + Queue queue1 = _session.createQueue(addr1WithAltExch); + + String queueName2 = getTestQueueName() + "2"; + String addr2WithoutAltExch = String.format("ADDR:%s;{create:always,node:{durable: true,type:queue}}", queueName2); + Queue queue2 = _session.createQueue(addr2WithoutAltExch); + + createQueueOnBroker(queue1); + createQueueOnBroker(queue2); + + ManagedQueue managedQueue1 = _jmxUtils.getManagedQueue(queueName1); + assertEquals("Newly created queue1 does not have expected alternate exchange", altExchange1, managedQueue1.getAlternateExchange()); + + ManagedQueue managedQueue2 = _jmxUtils.getManagedQueue(queueName2); + assertNull("Newly created queue2 does not have expected alternate exchange", managedQueue2.getAlternateExchange()); + + String altExchange2 = nonMandatoryExchangeName; + managedQueue2.setAlternateExchange(altExchange2); + + restartBroker(); + + managedQueue1 = _jmxUtils.getManagedQueue(queueName1); + assertEquals("Queue1 does not have expected alternate exchange after restart", altExchange1, managedQueue1.getAlternateExchange()); + + managedQueue2 = _jmxUtils.getManagedQueue(queueName2); + assertEquals("Queue2 does not have expected updated alternate exchange after restart", altExchange2, managedQueue2.getAlternateExchange()); + } + + /** + * Tests the ability to receive queue alerts as JMX notifications. + * + * @see NotificationCheckTest + * @see SimpleAMQQueueTest#testNotificationFiredAsync() + * @see SimpleAMQQueueTest#testNotificationFiredOnEnqueue() + */ + public void testQueueNotification() throws Exception + { + final String queueName = getName(); + final long maximumMessageCount = 3; + + Queue queue = _session.createQueue(queueName); + createQueueOnBroker(queue); + + ManagedQueue managedQueue = _jmxUtils.getManagedQueue(queueName); + managedQueue.setMaximumMessageCount(maximumMessageCount); + + RecordingNotificationListener listener = new RecordingNotificationListener(1); + + _jmxUtils.addNotificationListener(_jmxUtils.getQueueObjectName(VIRTUAL_HOST, queueName), listener, null, null); + + // Send two messages - this should *not* trigger the notification + sendMessage(_session, queue, 2); + + assertEquals("Premature notification received", 0, listener.getNumberOfNotificationsReceived()); + + // A further message should trigger the message count alert + sendMessage(_session, queue, 1); + + listener.awaitExpectedNotifications(5, TimeUnit.SECONDS); + + assertEquals("Unexpected number of JMX notifications received", 1, listener.getNumberOfNotificationsReceived()); + + Notification notification = listener.getLastNotification(); + assertEquals("Unexpected notification message", "MESSAGE_COUNT_ALERT 3: Maximum count on queue threshold (3) breached.", notification.getMessage()); + } + + /** + * Tests {@link ManagedQueue#viewMessages(long, long)} interface. + */ + public void testViewSingleMessage() throws Exception + { + final List<Message> sentMessages = sendMessage(_session, _sourceQueue, 1); + syncSession(_session); + final Message sentMessage = sentMessages.get(0); + + assertEquals("Unexpected queue depth", 1, _managedSourceQueue.getMessageCount().intValue()); + + // Check the contents of the message + final TabularData tab = _managedSourceQueue.viewMessages(1l, 1l); + assertEquals("Unexpected number of rows in table", 1, tab.size()); + final Iterator<CompositeData> rowItr = (Iterator<CompositeData>) tab.values().iterator(); + + final CompositeData row1 = rowItr.next(); + assertNotNull("Message should have AMQ message id", row1.get(ManagedQueue.MSG_AMQ_ID)); + assertEquals("Unexpected queue position", 1l, row1.get(ManagedQueue.MSG_QUEUE_POS)); + assertEquals("Unexpected redelivered flag", Boolean.FALSE, row1.get(ManagedQueue.MSG_REDELIVERED)); + + // Check the contents of header (encoded in a string array) + final String[] headerArray = (String[]) row1.get(ManagedQueue.MSG_HEADER); + assertNotNull("Expected message header array", headerArray); + final Map<String, String> headers = headerArrayToMap(headerArray); + + final String expectedJMSMessageID = isBroker010() ? sentMessage.getJMSMessageID().replace("ID:", "") : sentMessage.getJMSMessageID(); + final String expectedFormattedJMSTimestamp = FastDateFormat.getInstance(ManagedQueue.JMSTIMESTAMP_DATETIME_FORMAT).format(sentMessage.getJMSTimestamp()); + assertEquals("Unexpected JMSMessageID within header", expectedJMSMessageID, headers.get("JMSMessageID")); + assertEquals("Unexpected JMSPriority within header", String.valueOf(sentMessage.getJMSPriority()), headers.get("JMSPriority")); + assertEquals("Unexpected JMSTimestamp within header", expectedFormattedJMSTimestamp, headers.get("JMSTimestamp")); + } + + /** + * Tests {@link ManagedQueue#moveMessages(long, long, String)} interface. + */ + public void testMoveMessagesBetweenQueues() throws Exception + { + final int numberOfMessagesToSend = 10; + + sendMessage(_session, _sourceQueue, numberOfMessagesToSend); + syncSession(_session); + assertEquals("Unexpected queue depth after send", numberOfMessagesToSend, _managedSourceQueue.getMessageCount().intValue()); + + List<Long> amqMessagesIds = getAMQMessageIdsOn(_managedSourceQueue, 1, numberOfMessagesToSend); + + // Move first three messages to destination + long fromMessageId = amqMessagesIds.get(0); + long toMessageId = amqMessagesIds.get(2); + _managedSourceQueue.moveMessages(fromMessageId, toMessageId, _destinationQueueName); + + assertEquals("Unexpected queue depth on destination queue after first move", 3, _managedDestinationQueue.getMessageCount().intValue()); + assertEquals("Unexpected queue depth on source queue after first move", 7, _managedSourceQueue.getMessageCount().intValue()); + + // Now move a further two messages to destination + fromMessageId = amqMessagesIds.get(7); + toMessageId = amqMessagesIds.get(8); + _managedSourceQueue.moveMessages(fromMessageId, toMessageId, _destinationQueueName); + assertEquals("Unexpected queue depth on destination queue after second move", 5, _managedDestinationQueue.getMessageCount().intValue()); + assertEquals("Unexpected queue depth on source queue after second move", 5, _managedSourceQueue.getMessageCount().intValue()); + + assertMessageIndicesOn(_destinationQueue, 0, 1, 2, 7, 8); + } + + /** + * Tests {@link ManagedQueue#copyMessages(long, long, String)} interface. + */ + public void testCopyMessagesBetweenQueues() throws Exception + { + final int numberOfMessagesToSend = 10; + sendMessage(_session, _sourceQueue, numberOfMessagesToSend); + syncSession(_session); + assertEquals("Unexpected queue depth after send", numberOfMessagesToSend, _managedSourceQueue.getMessageCount().intValue()); + + List<Long> amqMessagesIds = getAMQMessageIdsOn(_managedSourceQueue, 1, numberOfMessagesToSend); + + // Copy first three messages to destination + long fromMessageId = amqMessagesIds.get(0); + long toMessageId = amqMessagesIds.get(2); + _managedSourceQueue.copyMessages(fromMessageId, toMessageId, _destinationQueueName); + + assertEquals("Unexpected queue depth on destination queue after first copy", 3, _managedDestinationQueue.getMessageCount().intValue()); + assertEquals("Unexpected queue depth on source queue after first copy", numberOfMessagesToSend, _managedSourceQueue.getMessageCount().intValue()); + + // Now copy a further two messages to destination + fromMessageId = amqMessagesIds.get(7); + toMessageId = amqMessagesIds.get(8); + _managedSourceQueue.copyMessages(fromMessageId, toMessageId, _destinationQueueName); + assertEquals("Unexpected queue depth on destination queue after second copy", 5, _managedDestinationQueue.getMessageCount().intValue()); + assertEquals("Unexpected queue depth on source queue after second copy", numberOfMessagesToSend, _managedSourceQueue.getMessageCount().intValue()); + + assertMessageIndicesOn(_destinationQueue, 0, 1, 2, 7, 8); + } + + + /** + * Tests {@link ManagedQueue#copyMessages(long, long, String)} interface. + */ + public void testCopyMessagesBetweenQueuesWithDuplicates() throws Exception + { + final int numberOfMessagesToSend = 10; + sendMessage(_session, _sourceQueue, numberOfMessagesToSend); + syncSession(_session); + assertEquals("Unexpected queue depth after send", + numberOfMessagesToSend, + _managedSourceQueue.getMessageCount().intValue()); + + List<Long> amqMessagesIds = getAMQMessageIdsOn(_managedSourceQueue, 1, numberOfMessagesToSend); + + // Copy first three messages to destination + long fromMessageId = amqMessagesIds.get(0); + long toMessageId = amqMessagesIds.get(2); + _managedSourceQueue.copyMessages(fromMessageId, toMessageId, _destinationQueueName); + + assertEquals("Unexpected queue depth on destination queue after first copy", + 3, + _managedDestinationQueue.getMessageCount().intValue()); + assertEquals("Unexpected queue depth on source queue after first copy", + numberOfMessagesToSend, + _managedSourceQueue.getMessageCount().intValue()); + + // Now copy a further two messages to destination + fromMessageId = amqMessagesIds.get(7); + toMessageId = amqMessagesIds.get(8); + _managedSourceQueue.copyMessages(fromMessageId, toMessageId, _destinationQueueName); + assertEquals("Unexpected queue depth on destination queue after second copy", + 5, + _managedDestinationQueue.getMessageCount().intValue()); + assertEquals("Unexpected queue depth on source queue after second copy", + numberOfMessagesToSend, + _managedSourceQueue.getMessageCount().intValue()); + + // Attempt to copy mixture of messages already on and some not already on the queue + + fromMessageId = amqMessagesIds.get(5); + toMessageId = amqMessagesIds.get(8); + _managedSourceQueue.copyMessages(fromMessageId, toMessageId, _destinationQueueName); + assertEquals("Unexpected queue depth on destination queue after second copy", + 7, + _managedDestinationQueue.getMessageCount().intValue()); + assertEquals("Unexpected queue depth on source queue after second copy", + numberOfMessagesToSend, + _managedSourceQueue.getMessageCount().intValue()); + + assertMessageIndicesOn(_destinationQueue, 0, 1, 2, 7, 8, 5, 6); + + + } + + public void testMoveMessagesBetweenQueuesWithActiveConsumerOnSourceQueue() throws Exception + { + setTestClientSystemProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, new Integer(1).toString()); + Connection asyncConnection = getConnection(); + asyncConnection.start(); + + final int numberOfMessagesToSend = 50; + sendMessage(_session, _sourceQueue, numberOfMessagesToSend); + syncSession(_session); + assertEquals("Unexpected queue depth after send", numberOfMessagesToSend, _managedSourceQueue.getMessageCount().intValue()); + + List<Long> amqMessagesIds = getAMQMessageIdsOn(_managedSourceQueue, 1, numberOfMessagesToSend); + + long fromMessageId = amqMessagesIds.get(0); + long toMessageId = amqMessagesIds.get(numberOfMessagesToSend - 1); + + CountDownLatch consumerReadToHalfwayLatch = new CountDownLatch(numberOfMessagesToSend / 2); + AtomicInteger totalConsumed = new AtomicInteger(0); + startAsyncConsumerOn(_sourceQueue, asyncConnection, consumerReadToHalfwayLatch, totalConsumed); + + boolean halfwayPointReached = consumerReadToHalfwayLatch.await(5000, TimeUnit.MILLISECONDS); + assertTrue("Did not read half of messages within time allowed", halfwayPointReached); + + _managedSourceQueue.moveMessages(fromMessageId, toMessageId, _destinationQueueName); + + asyncConnection.stop(); + + // The exact number of messages moved will be non deterministic, as the number of messages processed + // by the consumer cannot be predicted. There is also the possibility that a message can remain + // on the source queue. This situation will arise if a message has been acquired by the consumer, but not + // yet delivered to the client application (i.e. MessageListener#onMessage()) when the Connection#stop() occurs. + // + // The number of messages moved + the number consumed + any messages remaining on source should + // *always* be equal to the number we originally sent. + + int numberOfMessagesReadByConsumer = totalConsumed.intValue(); + int numberOfMessagesOnDestinationQueue = _managedDestinationQueue.getMessageCount().intValue(); + int numberOfMessagesRemainingOnSourceQueue = _managedSourceQueue.getMessageCount().intValue(); + + LOGGER.debug("Async consumer read : " + numberOfMessagesReadByConsumer + + " Number of messages moved to destination : " + numberOfMessagesOnDestinationQueue + + " Number of messages remaining on source : " + numberOfMessagesRemainingOnSourceQueue); + assertEquals("Unexpected number of messages after move", numberOfMessagesToSend, numberOfMessagesReadByConsumer + numberOfMessagesOnDestinationQueue + numberOfMessagesRemainingOnSourceQueue); + } + + public void testMoveMessagesBetweenQueuesWithActiveConsumerOnDestinationQueue() throws Exception + { + setTestClientSystemProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, new Integer(1).toString()); + Connection asyncConnection = getConnection(); + asyncConnection.start(); + + final int numberOfMessagesToSend = 50; + sendMessage(_session, _sourceQueue, numberOfMessagesToSend); + syncSession(_session); + assertEquals("Unexpected queue depth after send", numberOfMessagesToSend, _managedSourceQueue.getMessageCount().intValue()); + + List<Long> amqMessagesIds = getAMQMessageIdsOn(_managedSourceQueue, 1, numberOfMessagesToSend); + long fromMessageId = amqMessagesIds.get(0); + long toMessageId = amqMessagesIds.get(numberOfMessagesToSend - 1); + + AtomicInteger totalConsumed = new AtomicInteger(0); + CountDownLatch allMessagesConsumedLatch = new CountDownLatch(numberOfMessagesToSend); + startAsyncConsumerOn(_destinationQueue, asyncConnection, allMessagesConsumedLatch, totalConsumed); + + _managedSourceQueue.moveMessages(fromMessageId, toMessageId, _destinationQueueName); + + allMessagesConsumedLatch.await(5000, TimeUnit.MILLISECONDS); + assertEquals("Did not consume all messages from destination queue", numberOfMessagesToSend, totalConsumed.intValue()); + } + + /** + * Tests {@link ManagedQueue#moveMessages(long, long, String)} interface. + */ + public void testMoveMessageBetweenQueuesWithBrokerRestart() throws Exception + { + final int numberOfMessagesToSend = 1; + + sendMessage(_session, _sourceQueue, numberOfMessagesToSend); + syncSession(_session); + assertEquals("Unexpected queue depth after send", numberOfMessagesToSend, _managedSourceQueue.getMessageCount().intValue()); + + restartBroker(); + + createManagementInterfacesForQueues(); + createConnectionAndSession(); + + List<Long> amqMessagesIds = getAMQMessageIdsOn(_managedSourceQueue, 1, numberOfMessagesToSend); + + // Move messages to destination + long messageId = amqMessagesIds.get(0); + _managedSourceQueue.moveMessages(messageId, messageId, _destinationQueueName); + + assertEquals("Unexpected queue depth on destination queue after move", 1, _managedDestinationQueue.getMessageCount().intValue()); + assertEquals("Unexpected queue depth on source queue after move", 0, _managedSourceQueue.getMessageCount().intValue()); + + assertMessageIndicesOn(_destinationQueue, 0); + } + + /** + * Tests {@link ManagedQueue#copyMessages(long, long, String)} interface. + */ + public void testCopyMessageBetweenQueuesWithBrokerRestart() throws Exception + { + final int numberOfMessagesToSend = 1; + + sendMessage(_session, _sourceQueue, numberOfMessagesToSend); + syncSession(_session); + assertEquals("Unexpected queue depth after send", numberOfMessagesToSend, _managedSourceQueue.getMessageCount().intValue()); + + restartBroker(); + + createManagementInterfacesForQueues(); + createConnectionAndSession(); + + List<Long> amqMessagesIds = getAMQMessageIdsOn(_managedSourceQueue, 1, numberOfMessagesToSend); + + // Move messages to destination + long messageId = amqMessagesIds.get(0); + _managedSourceQueue.copyMessages(messageId, messageId, _destinationQueueName); + + assertEquals("Unexpected queue depth on destination queue after copy", 1, _managedDestinationQueue.getMessageCount().intValue()); + assertEquals("Unexpected queue depth on source queue after copy", 1, _managedSourceQueue.getMessageCount().intValue()); + + assertMessageIndicesOn(_destinationQueue, 0); + } + + /** + * Tests {@link ManagedQueue#deleteMessages(long, long)} interface. + */ + public void testDeleteMessages() throws Exception + { + final int numberOfMessagesToSend = 15; + + sendMessage(_session, _sourceQueue, numberOfMessagesToSend); + syncSession(_session); + assertEquals("Unexpected queue depth after send", numberOfMessagesToSend, _managedSourceQueue.getMessageCount().intValue()); + List<Long> amqMessagesIds = getAMQMessageIdsOn(_managedSourceQueue, 1, numberOfMessagesToSend); + // Current expected queue state, in terms of message header indices: [0,1,2,3,4,5,6,7,8,9,10,11,12,13,14] + + // Delete the first message (Remember the amqMessagesIds list, and the message indices added as a property when sending, are both 0-based index) + long fromMessageId = amqMessagesIds.get(0); + long toMessageId = fromMessageId; + _managedSourceQueue.deleteMessages(fromMessageId, toMessageId); + assertEquals("Unexpected message count after first deletion", numberOfMessagesToSend - 1, _managedSourceQueue.getMessageCount().intValue()); + // Current expected queue state, in terms of message header indices: [X,1,2,3,4,5,6,7,8,9,10,11,12,13,14] + + // Delete the 9th-10th messages, in the middle of the queue + fromMessageId = amqMessagesIds.get(8); + toMessageId = amqMessagesIds.get(9); + _managedSourceQueue.deleteMessages(fromMessageId, toMessageId); + assertEquals("Unexpected message count after third deletion", numberOfMessagesToSend - 3, _managedSourceQueue.getMessageCount().intValue()); + // Current expected queue state, in terms of message header indices: [X,1,2,3,4,5,6,7,X,X,10,11,12,13,14] + + // Delete the 11th and 12th messages, but still include the IDs for the 9th and 10th messages in the + // range to ensure their IDs are 'skipped' until the matching messages are found + fromMessageId = amqMessagesIds.get(8); + toMessageId = amqMessagesIds.get(11); + _managedSourceQueue.deleteMessages(fromMessageId, toMessageId); + assertEquals("Unexpected message count after fourth deletion", numberOfMessagesToSend - 5, _managedSourceQueue.getMessageCount().intValue()); + // Current expected queue state, in terms of message header indices: [X,1,2,3,4,5,6,7,X,X,X,X,12,13,14] + + // Delete the 8th message and the 13th message, including the IDs for the 9th-12th messages in the + // range to ensure their IDs are 'skipped' and the other matching message is found + fromMessageId = amqMessagesIds.get(7); + toMessageId = amqMessagesIds.get(12); + _managedSourceQueue.deleteMessages(fromMessageId, toMessageId); + assertEquals("Unexpected message count after fourth deletion", numberOfMessagesToSend - 7, _managedSourceQueue.getMessageCount().intValue()); + // Current expected queue state, in terms of message header indices: [X,1,2,3,4,5,6,X,X,X,X,X,X,13,14] + + // Delete the last message message + fromMessageId = amqMessagesIds.get(numberOfMessagesToSend -1); + toMessageId = fromMessageId; + _managedSourceQueue.deleteMessages(fromMessageId, toMessageId); + assertEquals("Unexpected message count after second deletion", numberOfMessagesToSend - 8, _managedSourceQueue.getMessageCount().intValue()); + // Current expected queue state, in terms of message header indices: [X,1,2,3,4,5,6,X,X,X,X,X,X,13,X] + + // Verify the message indices with a consumer + assertMessageIndicesOn(_sourceQueue, 1,2,3,4,5,6,13); + } + + public void testGetMessageGroupKey() throws Exception + { + final String queueName = getName(); + final ManagedBroker managedBroker = _jmxUtils.getManagedBroker(VIRTUAL_HOST); + + final Object messageGroupKey = "test"; + final Map<String, Object> arguments = Collections.singletonMap(QueueArgumentsConverter.QPID_GROUP_HEADER_KEY, messageGroupKey); + managedBroker.createNewQueue(queueName, null, true, arguments); + + final ManagedQueue managedQueue = _jmxUtils.getManagedQueue(queueName); + + assertNotNull("Manager queue expected to be available", managedQueue); + assertEquals("Unexpected message group key", messageGroupKey, managedQueue.getMessageGroupKey()); + assertEquals("Unexpected message group sharing", false, managedQueue.isMessageGroupSharedGroups()); + } + + public void testIsMessageGroupSharedGroups() throws Exception + { + final String queueName = getName(); + final ManagedBroker managedBroker = _jmxUtils.getManagedBroker(VIRTUAL_HOST); + + final Object messageGroupKey = "test"; + final Map<String, Object> arguments = new HashMap<String, Object>(2); + arguments.put(QueueArgumentsConverter.QPID_GROUP_HEADER_KEY, messageGroupKey); + arguments.put(QueueArgumentsConverter.QPID_SHARED_MSG_GROUP, StandardQueueImpl.SHARED_MSG_GROUP_ARG_VALUE); + managedBroker.createNewQueue(queueName, null, true, arguments); + + final ManagedQueue managedQueue = _jmxUtils.getManagedQueue(queueName); + + assertNotNull("Manager queue expected to be available", managedQueue); + assertEquals("Unexpected message group key", messageGroupKey, managedQueue.getMessageGroupKey()); + assertEquals("Unexpected message group sharing", true, managedQueue.isMessageGroupSharedGroups()); + } + + @Override + public Message createNextMessage(Session session, int messageNumber) throws JMSException + { + Message message = session.createTextMessage(getContentForMessageNumber(messageNumber)); + message.setIntProperty(INDEX, messageNumber); + return message; + } + + private void startAsyncConsumerOn(Destination queue, Connection asyncConnection, + final CountDownLatch requiredNumberOfMessagesRead, final AtomicInteger totalConsumed) throws Exception + { + Session session = asyncConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createConsumer(queue); + consumer.setMessageListener(new MessageListener() + { + + @Override + public void onMessage(Message arg0) + { + totalConsumed.incrementAndGet(); + requiredNumberOfMessagesRead.countDown(); + } + }); + } + + private void assertMessageIndicesOn(Destination queue, int... expectedIndices) throws Exception + { + MessageConsumer consumer = _session.createConsumer(queue); + + for (int i : expectedIndices) + { + TextMessage message = (TextMessage)consumer.receive(1000); + assertNotNull("Expected message with index " + i, message); + assertEquals("Expected message with index " + i, i, message.getIntProperty(INDEX)); + assertEquals("Expected message content", getContentForMessageNumber(i), message.getText()); + } + + assertNull("Unexpected message encountered", consumer.receive(1000)); + } + + private List<Long> getAMQMessageIdsOn(ManagedQueue managedQueue, long startIndex, long endIndex) throws Exception + { + final SortedSet<Long> messageIds = new TreeSet<Long>(); + + final TabularData tab = managedQueue.viewMessages(startIndex, endIndex); + final Iterator<CompositeData> rowItr = (Iterator<CompositeData>) tab.values().iterator(); + while(rowItr.hasNext()) + { + final CompositeData row = rowItr.next(); + long amqMessageId = (Long)row.get(ManagedQueue.MSG_AMQ_ID); + messageIds.add(amqMessageId); + } + + return new ArrayList<Long>(messageIds); + } + + /** + * + * Utility method to convert array of Strings in the form x = y into a + * map with key/value x => y. + * + */ + private Map<String,String> headerArrayToMap(final String[] headerArray) + { + final Map<String, String> headerMap = new HashMap<String, String>(); + final List<String> headerList = Arrays.asList(headerArray); + for (Iterator<String> iterator = headerList.iterator(); iterator.hasNext();) + { + final String nameValuePair = iterator.next(); + final String[] nameValue = nameValuePair.split(" *= *", 2); + headerMap.put(nameValue[0], nameValue[1]); + } + return headerMap; + } + + private void createQueueOnBroker(Destination destination) throws JMSException + { + _session.createConsumer(destination).close(); // Create a consumer only to cause queue creation + } + + private void syncSession(Session session) throws Exception + { + ((AMQSession<?,?>)session).sync(); + } + + private void createConnectionAndSession() throws JMSException, + NamingException + { + _connection = getConnection(); + _connection.start(); + _session = _connection.createSession(true, Session.SESSION_TRANSACTED); + } + + private void createManagementInterfacesForQueues() + { + _managedSourceQueue = _jmxUtils.getManagedQueue(_sourceQueueName); + _managedDestinationQueue = _jmxUtils.getManagedQueue(_destinationQueueName); + } + + private String getContentForMessageNumber(int msgCount) + { + return "Message count " + msgCount; + } + + private final class RecordingNotificationListener implements NotificationListener + { + private final CountDownLatch _notificationReceivedLatch; + private final AtomicInteger _numberOfNotifications; + private final AtomicReference<Notification> _lastNotification; + + private RecordingNotificationListener(int expectedNumberOfNotifications) + { + _notificationReceivedLatch = new CountDownLatch(expectedNumberOfNotifications); + _numberOfNotifications = new AtomicInteger(0); + _lastNotification = new AtomicReference<Notification>(); + } + + @Override + public void handleNotification(Notification notification, Object handback) + { + _lastNotification.set(notification); + _numberOfNotifications.incrementAndGet(); + _notificationReceivedLatch.countDown(); + } + + public int getNumberOfNotificationsReceived() + { + return _numberOfNotifications.get(); + } + + public Notification getLastNotification() + { + return _lastNotification.get(); + } + + public void awaitExpectedNotifications(long timeout, TimeUnit timeunit) throws InterruptedException + { + _notificationReceivedLatch.await(timeout, timeunit); + } + } + +} diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/systest/management/jmx/StatisticsTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/systest/management/jmx/StatisticsTest.java new file mode 100644 index 0000000000..4ea071f3ac --- /dev/null +++ b/qpid/java/systests/src/test/java/org/apache/qpid/systest/management/jmx/StatisticsTest.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.systest.management.jmx; + +import java.util.List; + +import javax.jms.Connection; +import javax.jms.MessageConsumer; +import javax.jms.Queue; +import javax.jms.Session; + +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.management.common.mbeans.ManagedBroker; +import org.apache.qpid.management.common.mbeans.ManagedConnection; +import org.apache.qpid.management.common.mbeans.ServerInformation; +import org.apache.qpid.test.utils.JMXTestUtils; +import org.apache.qpid.test.utils.QpidBrokerTestCase; + +public class StatisticsTest extends QpidBrokerTestCase +{ + private static final String TEST_VIRTUALHOST1 = "test1"; + private static final String TEST_VIRTUALHOST2 = "test2"; + + private static final String TEST_USER = "admin"; + private static final String TEST_PASSWORD = "admin"; + private static final int MESSAGE_COUNT_TEST = 5; + private static final int MESSAGE_COUNT_DEV = 9; + + private JMXTestUtils _jmxUtils; + private Connection _vhost1Connection, _vhost2Connection; + private Session _vhost1Session, _vhost2Session; + private Queue _vhost1Queue, _vhost2Queue; + protected String _brokerUrl; + + @Override + public void setUp() throws Exception + { + createTestVirtualHostNode(0, TEST_VIRTUALHOST1); + createTestVirtualHostNode(0, TEST_VIRTUALHOST2); + + getBrokerConfiguration().addJmxManagementConfiguration(); + + _jmxUtils = new JMXTestUtils(this, TEST_USER, TEST_PASSWORD); + + super.setUp(); + + _brokerUrl = getBroker().toString(); + _vhost1Connection = new AMQConnection(_brokerUrl, TEST_USER, TEST_PASSWORD, "clientid", TEST_VIRTUALHOST1); + _vhost2Connection = new AMQConnection(_brokerUrl, TEST_USER, TEST_PASSWORD, "clientid", TEST_VIRTUALHOST2); + _vhost1Connection.start(); + _vhost2Connection.start(); + + _vhost1Session = _vhost1Connection.createSession(true, Session.SESSION_TRANSACTED); + _vhost2Session = _vhost2Connection.createSession(true, Session.SESSION_TRANSACTED); + + _vhost1Queue = _vhost2Session.createQueue(getTestQueueName()); + _vhost2Queue = _vhost1Session.createQueue(getTestQueueName()); + + //Create queues by opening and closing consumers + final MessageConsumer vhost1Consumer = _vhost1Session.createConsumer(_vhost2Queue); + vhost1Consumer.close(); + final MessageConsumer vhost2Consumer = _vhost2Session.createConsumer(_vhost1Queue); + vhost2Consumer.close(); + + _jmxUtils.open(); + } + + @Override + public void tearDown() throws Exception + { + _jmxUtils.close(); + + super.tearDown(); + } + + public void testInitialStatisticValues() throws Exception + { + //Check initial values + checkSingleConnectionOnVHostStatistics(TEST_VIRTUALHOST1, 0, 0, 0, 0); + checkVHostStatistics(TEST_VIRTUALHOST1, 0, 0, 0, 0); + checkSingleConnectionOnVHostStatistics(TEST_VIRTUALHOST2, 0, 0, 0, 0); + checkVHostStatistics(TEST_VIRTUALHOST2, 0, 0, 0, 0); + checkBrokerStatistics(0, 0, 0, 0); + } + + public void testSendOnSingleVHost() throws Exception + { + sendMessagesAndSync(_vhost1Session, _vhost2Queue, MESSAGE_COUNT_TEST); + + //Check values + checkSingleConnectionOnVHostStatistics(TEST_VIRTUALHOST1, MESSAGE_COUNT_TEST, 0, MESSAGE_COUNT_TEST * DEFAULT_MESSAGE_SIZE, 0); + checkVHostStatistics(TEST_VIRTUALHOST1, MESSAGE_COUNT_TEST, 0, MESSAGE_COUNT_TEST * DEFAULT_MESSAGE_SIZE, 0); + checkSingleConnectionOnVHostStatistics(TEST_VIRTUALHOST2, 0, 0, 0, 0); + checkVHostStatistics(TEST_VIRTUALHOST2, 0, 0, 0, 0); + checkBrokerStatistics(MESSAGE_COUNT_TEST, 0, MESSAGE_COUNT_TEST * DEFAULT_MESSAGE_SIZE, 0); + } + + public void testSendOnTwoVHosts() throws Exception + { + sendMessagesAndSync(_vhost1Session, _vhost2Queue, MESSAGE_COUNT_TEST); + sendMessagesAndSync(_vhost2Session, _vhost1Queue, MESSAGE_COUNT_DEV); + + //Check values + checkSingleConnectionOnVHostStatistics(TEST_VIRTUALHOST1, MESSAGE_COUNT_TEST, 0, MESSAGE_COUNT_TEST * DEFAULT_MESSAGE_SIZE, 0); + checkVHostStatistics(TEST_VIRTUALHOST1, MESSAGE_COUNT_TEST, 0, MESSAGE_COUNT_TEST * DEFAULT_MESSAGE_SIZE, 0); + checkSingleConnectionOnVHostStatistics(TEST_VIRTUALHOST2, MESSAGE_COUNT_DEV, 0, MESSAGE_COUNT_DEV * DEFAULT_MESSAGE_SIZE, 0); + checkVHostStatistics(TEST_VIRTUALHOST2, MESSAGE_COUNT_DEV, 0, MESSAGE_COUNT_DEV * DEFAULT_MESSAGE_SIZE, 0); + checkBrokerStatistics(MESSAGE_COUNT_TEST + MESSAGE_COUNT_DEV, 0, (MESSAGE_COUNT_TEST + MESSAGE_COUNT_DEV) * DEFAULT_MESSAGE_SIZE, 0); + } + + public void testSendAndConsumeOnSingleVHost() throws Exception + { + sendMessagesAndSync(_vhost1Session, _vhost2Queue, MESSAGE_COUNT_TEST); + consumeMessages(_vhost1Session, _vhost2Queue, MESSAGE_COUNT_TEST); + + //Check values + checkSingleConnectionOnVHostStatistics(TEST_VIRTUALHOST1, MESSAGE_COUNT_TEST, MESSAGE_COUNT_TEST, MESSAGE_COUNT_TEST * DEFAULT_MESSAGE_SIZE, MESSAGE_COUNT_TEST * DEFAULT_MESSAGE_SIZE); + checkVHostStatistics(TEST_VIRTUALHOST1, MESSAGE_COUNT_TEST, MESSAGE_COUNT_TEST, MESSAGE_COUNT_TEST * DEFAULT_MESSAGE_SIZE, MESSAGE_COUNT_TEST * DEFAULT_MESSAGE_SIZE); + checkSingleConnectionOnVHostStatistics(TEST_VIRTUALHOST2, 0, 0, 0, 0); + checkVHostStatistics(TEST_VIRTUALHOST2, 0, 0, 0, 0); + checkBrokerStatistics(MESSAGE_COUNT_TEST, MESSAGE_COUNT_TEST, MESSAGE_COUNT_TEST * DEFAULT_MESSAGE_SIZE, MESSAGE_COUNT_TEST * DEFAULT_MESSAGE_SIZE); + } + + public void testSendAndConsumeOnTwoVHosts() throws Exception + { + sendMessagesAndSync(_vhost1Session, _vhost2Queue, MESSAGE_COUNT_TEST); + sendMessagesAndSync(_vhost2Session, _vhost1Queue, MESSAGE_COUNT_DEV); + consumeMessages(_vhost1Session, _vhost2Queue, MESSAGE_COUNT_TEST); + consumeMessages(_vhost2Session, _vhost1Queue, MESSAGE_COUNT_DEV); + + //Check values + checkSingleConnectionOnVHostStatistics(TEST_VIRTUALHOST1, MESSAGE_COUNT_TEST, MESSAGE_COUNT_TEST, MESSAGE_COUNT_TEST * DEFAULT_MESSAGE_SIZE, MESSAGE_COUNT_TEST * DEFAULT_MESSAGE_SIZE); + checkVHostStatistics(TEST_VIRTUALHOST1, MESSAGE_COUNT_TEST, MESSAGE_COUNT_TEST, MESSAGE_COUNT_TEST * DEFAULT_MESSAGE_SIZE, MESSAGE_COUNT_TEST * DEFAULT_MESSAGE_SIZE); + checkSingleConnectionOnVHostStatistics(TEST_VIRTUALHOST2, MESSAGE_COUNT_DEV, MESSAGE_COUNT_DEV, MESSAGE_COUNT_DEV * DEFAULT_MESSAGE_SIZE, MESSAGE_COUNT_DEV * DEFAULT_MESSAGE_SIZE); + checkVHostStatistics(TEST_VIRTUALHOST2, MESSAGE_COUNT_DEV, MESSAGE_COUNT_DEV, MESSAGE_COUNT_DEV * DEFAULT_MESSAGE_SIZE, MESSAGE_COUNT_DEV * DEFAULT_MESSAGE_SIZE); + checkBrokerStatistics(MESSAGE_COUNT_TEST + MESSAGE_COUNT_DEV, MESSAGE_COUNT_TEST + MESSAGE_COUNT_DEV, (MESSAGE_COUNT_TEST + MESSAGE_COUNT_DEV) * DEFAULT_MESSAGE_SIZE, (MESSAGE_COUNT_TEST + MESSAGE_COUNT_DEV) * DEFAULT_MESSAGE_SIZE); + } + + private void sendMessagesAndSync(Session session, Queue queue, int numberOfMessages) throws Exception + { + //Send messages via connection on and sync + sendMessage(session, queue, numberOfMessages); + ((AMQSession<?,?>)session).sync(); + } + + private void consumeMessages(Session session, Queue queue, int numberOfMessages) throws Exception + { + //consume the messages on the virtual host + final MessageConsumer consumer = session.createConsumer(queue); + for (int i = 0 ; i < numberOfMessages ; i++) + { + assertNotNull("an expected message was not received", consumer.receive(1500)); + } + session.commit(); + consumer.close(); + } + + private void checkSingleConnectionOnVHostStatistics(String vHostName, long messagesSent, long messagesReceived, long dataSent, long dataReceived) + { + List<ManagedConnection> managedConnections = _jmxUtils.getManagedConnections(vHostName); + assertEquals(1, managedConnections.size()); + + ManagedConnection managedConnection = managedConnections.get(0); + + assertEquals(messagesSent, managedConnection.getTotalMessagesReceived()); + assertEquals(messagesReceived, managedConnection.getTotalMessagesDelivered()); + + assertEquals(dataSent, managedConnection.getTotalDataReceived()); + assertEquals(dataReceived, managedConnection.getTotalDataDelivered()); + } + + private void checkVHostStatistics(String vHostName, long messagesSent, long messagesReceived, long dataSent, long dataReceived) + { + ManagedBroker vhost = _jmxUtils.getManagedBroker(vHostName); + + assertEquals(messagesSent, vhost.getTotalMessagesReceived()); + assertEquals(messagesReceived, vhost.getTotalMessagesDelivered()); + + assertEquals(dataSent, vhost.getTotalDataReceived()); + assertEquals(dataReceived, vhost.getTotalDataDelivered()); + } + + private void checkBrokerStatistics(long messagesSent, long messagesReceived, long dataSent, long dataReceived) + { + ServerInformation broker = _jmxUtils.getServerInformation(); + + assertEquals(messagesSent, broker.getTotalMessagesReceived()); + assertEquals(messagesReceived, broker.getTotalMessagesDelivered()); + + assertEquals(dataSent, broker.getTotalDataReceived()); + assertEquals(dataReceived, broker.getTotalDataDelivered()); + } +} diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/systest/management/jmx/UserManagementTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/systest/management/jmx/UserManagementTest.java new file mode 100644 index 0000000000..25b09f04c3 --- /dev/null +++ b/qpid/java/systests/src/test/java/org/apache/qpid/systest/management/jmx/UserManagementTest.java @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.systest.management.jmx; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileReader; +import java.io.FileWriter; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import javax.jms.Connection; +import javax.jms.JMSException; + +import org.apache.qpid.management.common.mbeans.UserManagement; +import org.apache.qpid.server.model.AuthenticationProvider; +import org.apache.qpid.server.security.auth.manager.PlainPasswordDatabaseAuthenticationManager; +import org.apache.qpid.test.utils.JMXTestUtils; +import org.apache.qpid.test.utils.QpidBrokerTestCase; +import org.apache.qpid.test.utils.TestBrokerConfiguration; +import org.apache.qpid.tools.security.Passwd; + +/** + * System test for User Management. + * + */ +public class UserManagementTest extends QpidBrokerTestCase +{ + private static final String TEST_NEWPASSWORD = "newpassword"; + private static final String TEST_PASSWORD = "password"; + private JMXTestUtils _jmxUtils; + private String _testUserName; + private File _passwordFile; + private UserManagement _userManagement; + private Passwd _passwd; + + public void setUp() throws Exception + { + _passwd = createPasswordEncodingUtility(); + _passwordFile = createTemporaryPasswordFileWithJmxAdminUser(); + + Map<String, Object> newAttributes = new HashMap<String, Object>(); + newAttributes.put(AuthenticationProvider.TYPE, getAuthenticationManagerType()); + newAttributes.put("path", _passwordFile.getAbsolutePath()); + getBrokerConfiguration().setObjectAttributes(AuthenticationProvider.class,TestBrokerConfiguration.ENTRY_NAME_AUTHENTICATION_PROVIDER, newAttributes); + getBrokerConfiguration().addJmxManagementConfiguration(); + + _jmxUtils = new JMXTestUtils(this); + + super.setUp(); + _jmxUtils.open(); + + _testUserName = getTestName() + System.currentTimeMillis(); + + _userManagement = _jmxUtils.getUserManagement(TestBrokerConfiguration.ENTRY_NAME_AUTHENTICATION_PROVIDER); + } + + + public void tearDown() throws Exception + { + try + { + if (_jmxUtils != null) + { + _jmxUtils.close(); + } + } + finally + { + super.tearDown(); + } + } + + public void testCreateUser() throws Exception + { + final int initialNumberOfUsers = _userManagement.viewUsers().size(); + assertFileDoesNotContainsPasswordForUser(_testUserName); + + boolean success = _userManagement.createUser(_testUserName, TEST_PASSWORD); + assertTrue("Should have been able to create new user " + _testUserName, success); + assertEquals("Unexpected number of users after add", initialNumberOfUsers + 1, _userManagement.viewUsers().size()); + + assertFileContainsPasswordForUser(_testUserName); + } + + public void testJmsLoginForNewUser() throws Exception + { + assertJmsConnectionFails(_testUserName, TEST_PASSWORD); + testCreateUser(); + + assertJmsConnectionSucceeds(_testUserName, TEST_PASSWORD); + } + + public void testDeleteUser() throws Exception + { + final int initialNumberOfUsers = _userManagement.viewUsers().size(); + + testCreateUser(); + + boolean success = _userManagement.deleteUser(_testUserName); + assertTrue("Should have been able to delete new user " + _testUserName, success); + assertEquals("Unexpected number of users after delete", initialNumberOfUsers, _userManagement.viewUsers().size()); + assertFileDoesNotContainsPasswordForUser(_testUserName); + } + + public void testJmsLoginNotPossibleForDeletedUser() throws Exception + { + testDeleteUser(); + + assertJmsConnectionFails(_testUserName, TEST_PASSWORD); + } + + public void testSetPassword() throws Exception + { + testCreateUser(); + + _userManagement.setPassword(_testUserName, TEST_NEWPASSWORD); + + assertFileContainsPasswordForUser(_testUserName); + } + + public void testJmsLoginForPasswordChangedUser() throws Exception + { + testSetPassword(); + + assertJmsConnectionSucceeds(_testUserName, TEST_NEWPASSWORD); + assertJmsConnectionFails(_testUserName, TEST_PASSWORD); + } + + public void testReload() throws Exception + { + writePasswordFile(_passwordFile, JMXTestUtils.DEFAULT_USERID, JMXTestUtils.DEFAULT_PASSWORD, _testUserName, TEST_PASSWORD); + + assertJmsConnectionFails(_testUserName, TEST_PASSWORD); + + _userManagement.reloadData(); + + assertJmsConnectionSucceeds(_testUserName, TEST_PASSWORD); + } + + public void testGetAuthenticationProviderType() throws Exception + { + String actualType = _userManagement.getAuthenticationProviderType(); + assertEquals("unexpected authentication provider type", getAuthenticationManagerType(), actualType); + } + + protected Passwd createPasswordEncodingUtility() + { + return new Passwd() + { + @Override + public String getOutput(String username, String password) + { + return username + ":" + password; + } + }; + } + + protected String getAuthenticationManagerType() + { + return PlainPasswordDatabaseAuthenticationManager.PROVIDER_TYPE; + } + + private File createTemporaryPasswordFileWithJmxAdminUser() throws Exception + { + File passwordFile = File.createTempFile("passwd", "pwd"); + passwordFile.deleteOnExit(); + writePasswordFile(passwordFile, JMXTestUtils.DEFAULT_USERID, JMXTestUtils.DEFAULT_PASSWORD); + return passwordFile; + } + + private void writePasswordFile(File passwordFile, String... userNamePasswordPairs) throws Exception + { + FileWriter writer = null; + try + { + writer = new FileWriter(passwordFile); + for (int i = 0; i < userNamePasswordPairs.length; i=i+2) + { + String username = userNamePasswordPairs[i]; + String password = userNamePasswordPairs[i+1]; + writer.append(_passwd.getOutput(username, password) + "\n"); + } + } + finally + { + writer.close(); + } + } + + + private void assertFileContainsPasswordForUser(String username) throws IOException + { + assertTrue("Could not find password for user " + username + " within " + _passwordFile, passwordFileContainsUser(username)); + } + + private void assertFileDoesNotContainsPasswordForUser(String username) throws IOException + { + assertFalse("Could not find password for user " + username + " within " + _passwordFile, passwordFileContainsUser(username)); + } + + private boolean passwordFileContainsUser(String username) throws IOException + { + BufferedReader reader = null; + try + { + reader = new BufferedReader(new FileReader(_passwordFile)); + String line = reader.readLine(); + while(line != null) + { + if (line.startsWith(username)) + { + return true; + } + line = reader.readLine(); + } + + return false; + } + finally + { + reader.close(); + } + } + + private void assertJmsConnectionSucceeds(String username, String password) throws Exception + { + Connection connection = getConnection(username, password); + assertNotNull(connection); + } + + private void assertJmsConnectionFails(String username, String password) throws Exception + { + try + { + getConnection(username, password); + fail("Exception not thrown"); + } + catch (JMSException e) + { + // PASS + } + } +} diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/systest/management/jmx/UserManagementWithBase64MD5PasswordsTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/systest/management/jmx/UserManagementWithBase64MD5PasswordsTest.java new file mode 100644 index 0000000000..ff441169b3 --- /dev/null +++ b/qpid/java/systests/src/test/java/org/apache/qpid/systest/management/jmx/UserManagementWithBase64MD5PasswordsTest.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.systest.management.jmx; + +import org.apache.qpid.server.security.auth.manager.Base64MD5PasswordDatabaseAuthenticationManager; +import org.apache.qpid.tools.security.Passwd; + +public class UserManagementWithBase64MD5PasswordsTest extends UserManagementTest +{ + @Override + protected Passwd createPasswordEncodingUtility() + { + return new Passwd(); + } + + @Override + protected String getAuthenticationManagerType() + { + return Base64MD5PasswordDatabaseAuthenticationManager.PROVIDER_TYPE; + } +} |