diff options
author | Stephen D. Huston <shuston@apache.org> | 2011-10-21 14:42:12 +0000 |
---|---|---|
committer | Stephen D. Huston <shuston@apache.org> | 2011-10-21 14:42:12 +0000 |
commit | f83677056891e436bf5ba99e79240df2a44528cd (patch) | |
tree | 625bfd644b948e89105630759cf6decb0435354d /java/broker/src/test | |
parent | ebfd9ff053b04ab379acfc0fefedee5a31b6d8a5 (diff) | |
download | qpid-python-QPID-2519.tar.gz |
Merged out from trunkQPID-2519
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-2519@1187375 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker/src/test')
46 files changed, 4535 insertions, 1134 deletions
diff --git a/java/broker/src/test/java/org/apache/qpid/server/BrokerOptionsTest.java b/java/broker/src/test/java/org/apache/qpid/server/BrokerOptionsTest.java new file mode 100644 index 0000000000..131f316330 --- /dev/null +++ b/java/broker/src/test/java/org/apache/qpid/server/BrokerOptionsTest.java @@ -0,0 +1,202 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +import org.apache.qpid.test.utils.QpidTestCase; + + +public class BrokerOptionsTest extends QpidTestCase +{ + private BrokerOptions _options; + + private static final int TEST_PORT1 = 6789; + private static final int TEST_PORT2 = 6790; + + + protected void setUp() + { + _options = new BrokerOptions(); + } + + public void testDefaultPort() + { + assertEquals(Collections.<Integer>emptySet(), _options.getPorts()); + } + + public void testOverriddenPort() + { + _options.addPort(TEST_PORT1); + assertEquals(Collections.singleton(TEST_PORT1), _options.getPorts()); + } + + public void testManyOverriddenPorts() + { + _options.addPort(TEST_PORT1); + _options.addPort(TEST_PORT2); + final Set<Integer> expectedPorts = new HashSet<Integer>(Arrays.asList(new Integer[] {TEST_PORT1, TEST_PORT2})); + assertEquals(expectedPorts, _options.getPorts()); + } + + public void testDuplicateOverriddenPortsAreSilentlyIgnored() + { + _options.addPort(TEST_PORT1); + _options.addPort(TEST_PORT2); + _options.addPort(TEST_PORT1); // duplicate - should be silently ignored + final Set<Integer> expectedPorts = new HashSet<Integer>(Arrays.asList(new Integer[] {TEST_PORT1, TEST_PORT2})); + assertEquals(expectedPorts, _options.getPorts()); + } + + public void testDefaultSSLPort() + { + assertEquals(Collections.<Integer>emptySet(), _options.getSSLPorts()); + } + + public void testOverriddenSSLPort() + { + _options.addSSLPort(TEST_PORT1); + assertEquals(Collections.singleton(TEST_PORT1), _options.getSSLPorts()); + } + + public void testManyOverriddenSSLPorts() + { + _options.addSSLPort(TEST_PORT1); + _options.addSSLPort(TEST_PORT2); + final Set<Integer> expectedPorts = new HashSet<Integer>(Arrays.asList(new Integer[] {TEST_PORT1, TEST_PORT2})); + assertEquals(expectedPorts, _options.getSSLPorts()); + } + + public void testDuplicateOverriddenSSLPortsAreSilentlyIgnored() + { + _options.addSSLPort(TEST_PORT1); + _options.addSSLPort(TEST_PORT2); + _options.addSSLPort(TEST_PORT1); // duplicate - should be silently ignored + final Set<Integer> expectedPorts = new HashSet<Integer>(Arrays.asList(new Integer[] {TEST_PORT1, TEST_PORT2})); + assertEquals(expectedPorts, _options.getSSLPorts()); + } + + public void testDefaultConfigFile() + { + assertNull(_options.getConfigFile()); + } + + public void testOverriddenConfigFile() + { + final String testConfigFile = "etc/mytestconfig.xml"; + _options.setConfigFile(testConfigFile); + assertEquals(testConfigFile, _options.getConfigFile()); + } + + public void testDefaultLogConfigFile() + { + assertNull(_options.getLogConfigFile()); + } + + public void testOverriddenLogConfigFile() + { + final String testLogConfigFile = "etc/mytestlog4j.xml"; + _options.setLogConfigFile(testLogConfigFile); + assertEquals(testLogConfigFile, _options.getLogConfigFile()); + } + + public void testDefaultJmxPortRegistryServer() + { + assertNull(_options.getJmxPortRegistryServer()); + } + + public void testJmxPortRegistryServer() + { + _options.setJmxPortRegistryServer(TEST_PORT1); + assertEquals(Integer.valueOf(TEST_PORT1), _options.getJmxPortRegistryServer()); + } + + public void testDefaultJmxPortConnectorServer() + { + assertNull(_options.getJmxPortConnectorServer()); + } + + public void testJmxPortConnectorServer() + { + _options.setJmxPortConnectorServer(TEST_PORT1); + assertEquals(Integer.valueOf(TEST_PORT1), _options.getJmxPortConnectorServer()); + } + + public void testQpidHomeExposesSysProperty() + { + assertEquals(System.getProperty("QPID_HOME"), _options.getQpidHome()); + } + + public void testDefaultExcludesPortFor0_10() + { + assertEquals(Collections.EMPTY_SET, _options.getExcludedPorts(ProtocolExclusion.v0_10)); + } + + public void testOverriddenExcludesPortFor0_10() + { + _options.addExcludedPort(ProtocolExclusion.v0_10, TEST_PORT1); + assertEquals(Collections.singleton(TEST_PORT1), _options.getExcludedPorts(ProtocolExclusion.v0_10)); + } + + public void testManyOverriddenExcludedPortFor0_10() + { + _options.addExcludedPort(ProtocolExclusion.v0_10, TEST_PORT1); + _options.addExcludedPort(ProtocolExclusion.v0_10, TEST_PORT2); + final Set<Integer> expectedPorts = new HashSet<Integer>(Arrays.asList(new Integer[] {TEST_PORT1, TEST_PORT2})); + assertEquals(expectedPorts, _options.getExcludedPorts(ProtocolExclusion.v0_10)); + } + + public void testDuplicatedOverriddenExcludedPortFor0_10AreSilentlyIgnored() + { + _options.addExcludedPort(ProtocolExclusion.v0_10, TEST_PORT1); + _options.addExcludedPort(ProtocolExclusion.v0_10, TEST_PORT2); + final Set<Integer> expectedPorts = new HashSet<Integer>(Arrays.asList(new Integer[] {TEST_PORT1, TEST_PORT2})); + assertEquals(expectedPorts, _options.getExcludedPorts(ProtocolExclusion.v0_10)); + } + + public void testDefaultBind() + { + assertNull(_options.getBind()); + } + + public void testOverriddenBind() + { + final String bind = "192.168.0.1"; + _options.setBind(bind); + assertEquals(bind, _options.getBind()); + } + + public void testDefaultLogWatchFrequency() + { + assertEquals(0L, _options.getLogWatchFrequency()); + } + + public void testOverridenLogWatchFrequency() + { + final int myFreq = 10 * 1000; + + _options.setLogWatchFrequency(myFreq); + assertEquals(myFreq, _options.getLogWatchFrequency()); + } +} diff --git a/java/broker/src/test/java/org/apache/qpid/server/MainTest.java b/java/broker/src/test/java/org/apache/qpid/server/MainTest.java new file mode 100644 index 0000000000..9b0ae82b84 --- /dev/null +++ b/java/broker/src/test/java/org/apache/qpid/server/MainTest.java @@ -0,0 +1,153 @@ +package org.apache.qpid.server; + +import java.util.EnumSet; + +import org.apache.qpid.test.utils.QpidTestCase; + +/** + * Test to verify the command line parsing within the Main class, by + * providing it a series of command line arguments and verifying the + * BrokerOptions emerging for use in starting the Broker instance. + */ +public class MainTest extends QpidTestCase +{ + public void testNoOptionsSpecified() + { + BrokerOptions options = startDummyMain(""); + + assertTrue(options.getPorts().isEmpty()); + assertTrue(options.getSSLPorts().isEmpty()); + assertEquals(null, options.getJmxPortRegistryServer()); + assertEquals(null, options.getConfigFile()); + assertEquals(null, options.getLogConfigFile()); + assertEquals(null, options.getBind()); + + for(ProtocolExclusion pe : EnumSet.allOf(ProtocolExclusion.class)) + { + assertEquals(0, options.getExcludedPorts(pe).size()); + } + } + + public void testPortOverriddenSingle() + { + BrokerOptions options = startDummyMain("-p 1234"); + + assertTrue(options.getPorts().contains(1234)); + assertEquals(1, options.getPorts().size()); + assertTrue(options.getSSLPorts().isEmpty()); + } + + public void testPortOverriddenMultiple() + { + BrokerOptions options = startDummyMain("-p 1234 -p 4321"); + + assertTrue(options.getPorts().contains(1234)); + assertTrue(options.getPorts().contains(4321)); + assertEquals(2, options.getPorts().size()); + assertTrue(options.getSSLPorts().isEmpty()); + } + + public void testSSLPortOverriddenSingle() + { + BrokerOptions options = startDummyMain("-s 5678"); + + assertTrue(options.getSSLPorts().contains(5678)); + assertEquals(1, options.getSSLPorts().size()); + assertTrue(options.getPorts().isEmpty()); + } + + public void testSSLPortOverriddenMultiple() + { + BrokerOptions options = startDummyMain("-s 5678 -s 8765"); + + assertTrue(options.getSSLPorts().contains(5678)); + assertTrue(options.getSSLPorts().contains(8765)); + assertEquals(2, options.getSSLPorts().size()); + assertTrue(options.getPorts().isEmpty()); + } + + public void testNonSSLandSSLPortsOverridden() + { + BrokerOptions options = startDummyMain("-p 5678 -s 8765"); + + assertTrue(options.getPorts().contains(5678)); + assertTrue(options.getSSLPorts().contains(8765)); + assertEquals(1, options.getPorts().size()); + assertEquals(1, options.getSSLPorts().size()); + } + + public void testJmxPortRegistryServerOverridden() + { + BrokerOptions options = startDummyMain("--jmxregistryport 3456"); + + assertEquals(Integer.valueOf(3456), options.getJmxPortRegistryServer()); + + options = startDummyMain("-m 3457"); + assertEquals(Integer.valueOf(3457), options.getJmxPortRegistryServer()); + } + + public void testJmxPortConnectorServerOverridden() + { + BrokerOptions options = startDummyMain("--jmxconnectorport 3456"); + + assertEquals(Integer.valueOf(3456), options.getJmxPortConnectorServer()); + } + + public void testExclude0_10() + { + BrokerOptions options = startDummyMain("-p 3456 --exclude-0-10 3456"); + + assertTrue(options.getPorts().contains(3456)); + assertEquals(1, options.getPorts().size()); + assertTrue(options.getExcludedPorts(ProtocolExclusion.v0_10).contains(3456)); + assertEquals(1, options.getExcludedPorts(ProtocolExclusion.v0_10).size()); + assertEquals(0, options.getExcludedPorts(ProtocolExclusion.v0_9_1).size()); + } + + public void testConfig() + { + BrokerOptions options = startDummyMain("-c abcd/config.xml"); + + assertEquals("abcd/config.xml", options.getConfigFile()); + } + + public void testLogConfig() + { + BrokerOptions options = startDummyMain("-l wxyz/log4j.xml"); + + assertEquals("wxyz/log4j.xml", options.getLogConfigFile()); + } + + public void testLogWatch() + { + BrokerOptions options = startDummyMain("-w 9"); + + assertEquals(9, options.getLogWatchFrequency()); + } + + private BrokerOptions startDummyMain(String commandLine) + { + return (new TestMain(commandLine.split("\\s"))).getOptions(); + } + + private class TestMain extends Main + { + private BrokerOptions _options; + + public TestMain(String[] args) + { + super(args); + } + + @Override + protected void startBroker(BrokerOptions options) + { + _options = options; + } + + public BrokerOptions getOptions() + { + return _options; + } + } +} diff --git a/java/broker/src/test/java/org/apache/qpid/server/RunBrokerWithCommand.java b/java/broker/src/test/java/org/apache/qpid/server/RunBrokerWithCommand.java deleted file mode 100644 index 59543874b4..0000000000 --- a/java/broker/src/test/java/org/apache/qpid/server/RunBrokerWithCommand.java +++ /dev/null @@ -1,132 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - * - */ -package org.apache.qpid.server; - -import org.apache.log4j.Logger; -import org.apache.log4j.Level; - -import java.io.InputStream; -import java.io.BufferedReader; -import java.io.InputStreamReader; -import java.io.IOException; - -public class RunBrokerWithCommand -{ - public static void main(String[] args) - { - //Start the broker - try - { - String[] fudge = args.clone(); - - // Override the first value which is the command we are going to run later. - fudge[0] = "-v"; - new Main(fudge).startup(); - } - catch (Exception e) - { - System.err.println("Unable to start broker due to: " + e.getMessage()); - - e.printStackTrace(); - exit(1); - } - - Logger.getRootLogger().setLevel(Level.ERROR); - - //run command - try - { - Process task = Runtime.getRuntime().exec(args[0]); - System.err.println("Started Proccess: " + args[0]); - - InputStream inputStream = task.getInputStream(); - - InputStream errorStream = task.getErrorStream(); - - Thread out = new Thread(new Outputter("[OUT]", new BufferedReader(new InputStreamReader(inputStream)))); - Thread err = new Thread(new Outputter("[ERR]", new BufferedReader(new InputStreamReader(errorStream)))); - - out.start(); - err.start(); - - out.join(); - err.join(); - - System.err.println("Waiting for process to exit: " + args[0]); - task.waitFor(); - System.err.println("Done Proccess: " + args[0]); - - } - catch (IOException e) - { - System.err.println("Proccess had problems: " + e.getMessage()); - e.printStackTrace(System.err); - exit(1); - } - catch (InterruptedException e) - { - System.err.println("Proccess had problems: " + e.getMessage()); - e.printStackTrace(System.err); - - exit(1); - } - - - exit(0); - } - - private static void exit(int i) - { - Logger.getRootLogger().setLevel(Level.INFO); - System.exit(i); - } - - static class Outputter implements Runnable - { - - BufferedReader reader; - String prefix; - - Outputter(String s, BufferedReader r) - { - prefix = s; - reader = r; - } - - public void run() - { - String line; - try - { - while ((line = reader.readLine()) != null) - { - System.out.println(prefix + line); - } - } - catch (IOException e) - { - System.out.println("Error occured reading; " + e.getMessage()); - } - } - - } - -} diff --git a/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java b/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java index 718874cf69..d22f1e6e94 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java @@ -20,742 +20,598 @@ */ package org.apache.qpid.server.configuration; +import static org.apache.qpid.transport.ConnectionSettings.WILDCARD_ADDRESS; + import java.io.File; import java.io.FileWriter; import java.io.IOException; -import java.io.RandomAccessFile; -import java.util.List; import java.util.Locale; -import junit.framework.TestCase; - import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.configuration.XMLConfiguration; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.protocol.AMQProtocolEngine; -import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry; -import org.apache.qpid.server.util.InternalBrokerBaseCase; +import org.apache.qpid.server.util.TestApplicationRegistry; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; -import org.apache.qpid.transport.TestNetworkDriver; +import org.apache.qpid.test.utils.QpidTestCase; -public class ServerConfigurationTest extends InternalBrokerBaseCase +public class ServerConfigurationTest extends QpidTestCase { private XMLConfiguration _config = new XMLConfiguration(); + private ServerConfiguration _serverConfig = null; - - public void testSetJMXManagementPort() throws ConfigurationException + @Override + protected void setUp() throws Exception { - ServerConfiguration serverConfig = new ServerConfiguration(_config); - serverConfig.initialise(); - serverConfig.setJMXManagementPort(23); - assertEquals(23, serverConfig.getJMXManagementPort()); + super.setUp(); + _serverConfig = new ServerConfiguration(_config); + ApplicationRegistry.initialise(new TestApplicationRegistry(_serverConfig)); } - public void testGetJMXManagementPort() throws ConfigurationException + @Override + protected void tearDown() throws Exception { - _config.setProperty("management.jmxport", 42); - ServerConfiguration serverConfig = new ServerConfiguration(_config); - serverConfig.initialise(); - assertEquals(42, serverConfig.getJMXManagementPort()); + super.tearDown(); + ApplicationRegistry.remove(); } - public void testGetPlatformMbeanserver() throws ConfigurationException + public void testSetJMXPortRegistryServer() throws ConfigurationException { - // Check default - ServerConfiguration serverConfig = new ServerConfiguration(_config); - serverConfig.initialise(); - assertEquals(true, serverConfig.getPlatformMbeanserver()); - - // Check value we set - _config.setProperty("management.platform-mbeanserver", false); - serverConfig = new ServerConfiguration(_config); - serverConfig.initialise(); - assertEquals(false, serverConfig.getPlatformMbeanserver()); + _serverConfig.initialise(); + _serverConfig.setJMXPortRegistryServer(23); + assertEquals(23, _serverConfig.getJMXPortRegistryServer()); } - public void testGetPluginDirectory() throws ConfigurationException + public void testGetJMXPortRegistryServer() throws ConfigurationException { - // Check default - ServerConfiguration serverConfig = new ServerConfiguration(_config); - serverConfig.initialise(); - assertEquals(null, serverConfig.getPluginDirectory()); - - // Check value we set - _config.setProperty("plugin-directory", "/path/to/plugins"); - serverConfig = new ServerConfiguration(_config); - serverConfig.initialise(); - assertEquals("/path/to/plugins", serverConfig.getPluginDirectory()); + _config.setProperty(ServerConfiguration.MGMT_JMXPORT_REGISTRYSERVER, 42); + _serverConfig.initialise(); + assertEquals(42, _serverConfig.getJMXPortRegistryServer()); } - public void testGetCacheDirectory() throws ConfigurationException + public void testDefaultJMXPortRegistryServer() throws ConfigurationException { - // Check default - ServerConfiguration serverConfig = new ServerConfiguration(_config); - serverConfig.initialise(); - assertEquals(null, serverConfig.getCacheDirectory()); - - // Check value we set - _config.setProperty("cache-directory", "/path/to/cache"); - serverConfig = new ServerConfiguration(_config); - serverConfig.initialise(); - assertEquals("/path/to/cache", serverConfig.getCacheDirectory()); + _serverConfig.initialise(); + assertEquals(8999, _serverConfig.getJMXPortRegistryServer()); } - public void testGetPrincipalDatabaseNames() throws ConfigurationException + public void testSetJMXPortConnectorServer() throws ConfigurationException { - // Check default ServerConfiguration serverConfig = new ServerConfiguration(_config); - serverConfig.initialise(); - assertEquals(0, serverConfig.getPrincipalDatabaseNames().size()); - - // Check value we set - _config.setProperty("security.principal-databases.principal-database(0).name", "a"); - _config.setProperty("security.principal-databases.principal-database(1).name", "b"); - serverConfig = new ServerConfiguration(_config); - serverConfig.initialise(); - List<String> dbs = serverConfig.getPrincipalDatabaseNames(); - assertEquals(2, dbs.size()); - assertEquals("a", dbs.get(0)); - assertEquals("b", dbs.get(1)); + serverConfig.setJMXPortConnectorServer(67); + assertEquals(67, serverConfig.getJMXConnectorServerPort()); } - public void testGetPrincipalDatabaseClass() throws ConfigurationException + public void testGetJMXPortConnectorServer() throws ConfigurationException { - // Check default - ServerConfiguration serverConfig = new ServerConfiguration(_config); - serverConfig.initialise(); - assertEquals(0, serverConfig.getPrincipalDatabaseClass().size()); - - // Check value we set - _config.setProperty("security.principal-databases.principal-database(0).class", "a"); - _config.setProperty("security.principal-databases.principal-database(1).class", "b"); - serverConfig = new ServerConfiguration(_config); - serverConfig.initialise(); - List<String> dbs = serverConfig.getPrincipalDatabaseClass(); - assertEquals(2, dbs.size()); - assertEquals("a", dbs.get(0)); - assertEquals("b", dbs.get(1)); - } - - public void testGetPrincipalDatabaseAttributeNames() throws ConfigurationException - { - // Check default + _config.setProperty(ServerConfiguration.MGMT_JMXPORT_CONNECTORSERVER, 67); ServerConfiguration serverConfig = new ServerConfiguration(_config); - serverConfig.initialise(); - assertEquals(0, serverConfig.getPrincipalDatabaseAttributeNames(1).size()); - - // Check value we set - _config.setProperty("security.principal-databases.principal-database(0).attributes(0).attribute.name", "a"); - _config.setProperty("security.principal-databases.principal-database(0).attributes(1).attribute.name", "b"); - serverConfig = new ServerConfiguration(_config); - serverConfig.initialise(); - List<String> dbs = serverConfig.getPrincipalDatabaseAttributeNames(0); - assertEquals(2, dbs.size()); - assertEquals("a", dbs.get(0)); - assertEquals("b", dbs.get(1)); + assertEquals(67, serverConfig.getJMXConnectorServerPort()); } - public void testGetPrincipalDatabaseAttributeValues() throws ConfigurationException + public void testDefaultJMXPortConnectorServer() throws ConfigurationException { - // Check default ServerConfiguration serverConfig = new ServerConfiguration(_config); - serverConfig.initialise(); - assertEquals(0, serverConfig.getPrincipalDatabaseAttributeValues(1).size()); - - // Check value we set - _config.setProperty("security.principal-databases.principal-database(0).attributes(0).attribute.value", "a"); - _config.setProperty("security.principal-databases.principal-database(0).attributes(1).attribute.value", "b"); - serverConfig = new ServerConfiguration(_config); - serverConfig.initialise(); - List<String> dbs = serverConfig.getPrincipalDatabaseAttributeValues(0); - assertEquals(2, dbs.size()); - assertEquals("a", dbs.get(0)); - assertEquals("b", dbs.get(1)); + assertEquals(ServerConfiguration.DEFAULT_JMXPORT_REGISTRYSERVER + ServerConfiguration.JMXPORT_CONNECTORSERVER_OFFSET, + serverConfig.getJMXConnectorServerPort()); } - public void testGetManagementAccessList() throws ConfigurationException - { - // Check default - ServerConfiguration serverConfig = new ServerConfiguration(_config); - serverConfig.initialise(); - assertEquals(0, serverConfig.getManagementAccessList().size()); - - // Check value we set - _config.setProperty("security.jmx.access(0)", "a"); - _config.setProperty("security.jmx.access(1)", "b"); - serverConfig = new ServerConfiguration(_config); - serverConfig.initialise(); - List<String> dbs = serverConfig.getManagementAccessList(); - assertEquals(2, dbs.size()); - assertEquals("a", dbs.get(0)); - assertEquals("b", dbs.get(1)); - } - - public void testGetFrameSize() throws ConfigurationException + public void testGetPlatformMbeanserver() throws ConfigurationException { - // Check default - ServerConfiguration serverConfig = new ServerConfiguration(_config); - serverConfig.initialise(); - assertEquals(65536, serverConfig.getFrameSize()); + _serverConfig.initialise(); + assertEquals(true, _serverConfig.getPlatformMbeanserver()); // Check value we set - _config.setProperty("advanced.framesize", "23"); - serverConfig = new ServerConfiguration(_config); - serverConfig.initialise(); - assertEquals(23, serverConfig.getFrameSize()); + _config.setProperty("management.platform-mbeanserver", false); + _serverConfig = new ServerConfiguration(_config); + _serverConfig.initialise(); + assertEquals(false, _serverConfig.getPlatformMbeanserver()); } - public void testGetProtectIOEnabled() throws ConfigurationException + public void testGetPluginDirectory() throws ConfigurationException { // Check default - ServerConfiguration serverConfig = new ServerConfiguration(_config); - serverConfig.initialise(); - assertEquals(false, serverConfig.getProtectIOEnabled()); + _serverConfig.initialise(); + assertEquals(null, _serverConfig.getPluginDirectory()); // Check value we set - _config.setProperty(ServerConfiguration.CONNECTOR_PROTECTIO_ENABLED, true); - serverConfig = new ServerConfiguration(_config); - serverConfig.initialise(); - assertEquals(true, serverConfig.getProtectIOEnabled()); + _config.setProperty("plugin-directory", "/path/to/plugins"); + _serverConfig = new ServerConfiguration(_config); + _serverConfig.initialise(); + assertEquals("/path/to/plugins", _serverConfig.getPluginDirectory()); } - public void testGetBufferReadLimit() throws ConfigurationException + public void testGetCacheDirectory() throws ConfigurationException { // Check default - ServerConfiguration serverConfig = new ServerConfiguration(_config); - serverConfig.initialise(); - assertEquals(262144, serverConfig.getBufferReadLimit()); + _serverConfig.initialise(); + assertEquals(null, _serverConfig.getCacheDirectory()); // Check value we set - _config.setProperty(ServerConfiguration.CONNECTOR_PROTECTIO_READ_BUFFER_LIMIT_SIZE, 23); - serverConfig = new ServerConfiguration(_config); - serverConfig.initialise(); - assertEquals(23, serverConfig.getBufferReadLimit()); + _config.setProperty("cache-directory", "/path/to/cache"); + _serverConfig = new ServerConfiguration(_config); + _serverConfig.initialise(); + assertEquals("/path/to/cache", _serverConfig.getCacheDirectory()); } - public void testGetBufferWriteLimit() throws ConfigurationException + public void testGetFrameSize() throws ConfigurationException { // Check default - ServerConfiguration serverConfig = new ServerConfiguration(_config); - serverConfig.initialise(); - assertEquals(262144, serverConfig.getBufferWriteLimit()); + _serverConfig.initialise(); + assertEquals(65536, _serverConfig.getFrameSize()); // Check value we set - _config.setProperty(ServerConfiguration.CONNECTOR_PROTECTIO_WRITE_BUFFER_LIMIT_SIZE, 23); - serverConfig = new ServerConfiguration(_config); - serverConfig.initialise(); - assertEquals(23, serverConfig.getBufferWriteLimit()); + _config.setProperty("advanced.framesize", "23"); + _serverConfig = new ServerConfiguration(_config); + _serverConfig.initialise(); + assertEquals(23, _serverConfig.getFrameSize()); } - public void testGetStatusEnabled() throws ConfigurationException { // Check default - ServerConfiguration serverConfig = new ServerConfiguration(_config); - serverConfig.initialise(); + _serverConfig.initialise(); assertEquals(ServerConfiguration.DEFAULT_STATUS_UPDATES.equalsIgnoreCase("on"), - serverConfig.getStatusUpdatesEnabled()); + _serverConfig.getStatusUpdatesEnabled()); // Check disabling we set _config.setProperty(ServerConfiguration.STATUS_UPDATES, "off"); - serverConfig = new ServerConfiguration(_config); - serverConfig.initialise(); - assertEquals(false, serverConfig.getStatusUpdatesEnabled()); + _serverConfig = new ServerConfiguration(_config); + _serverConfig.initialise(); + assertEquals(false, _serverConfig.getStatusUpdatesEnabled()); // Check invalid values don't cause error but result in disabled _config.setProperty(ServerConfiguration.STATUS_UPDATES, "Yes Please"); - serverConfig = new ServerConfiguration(_config); - serverConfig.initialise(); - assertEquals(false, serverConfig.getStatusUpdatesEnabled()); + _serverConfig = new ServerConfiguration(_config); + _serverConfig.initialise(); + assertEquals(false, _serverConfig.getStatusUpdatesEnabled()); } public void testGetSynchedClocks() throws ConfigurationException { // Check default - ServerConfiguration serverConfig = new ServerConfiguration(_config); - serverConfig.initialise(); - assertEquals(false, serverConfig.getSynchedClocks()); + _serverConfig.initialise(); + assertEquals(false, _serverConfig.getSynchedClocks()); // Check value we set _config.setProperty("advanced.synced-clocks", true); - serverConfig = new ServerConfiguration(_config); - serverConfig.initialise(); - assertEquals(true, serverConfig.getSynchedClocks()); + _serverConfig = new ServerConfiguration(_config); + _serverConfig.initialise(); + assertEquals(true, _serverConfig.getSynchedClocks()); } public void testGetLocale() throws ConfigurationException { // Check default - ServerConfiguration serverConfig = new ServerConfiguration(_config); - serverConfig.initialise(); + _serverConfig.initialise(); // The Default is what ever the VMs default is Locale defaultLocale = Locale.getDefault(); - assertEquals(defaultLocale, serverConfig.getLocale()); + assertEquals(defaultLocale, _serverConfig.getLocale()); //Test Language only Locale update = new Locale("es"); _config.setProperty(ServerConfiguration.ADVANCED_LOCALE, "es"); - serverConfig = new ServerConfiguration(_config); - serverConfig.initialise(); - assertEquals(update, serverConfig.getLocale()); + _serverConfig = new ServerConfiguration(_config); + _serverConfig.initialise(); + assertEquals(update, _serverConfig.getLocale()); //Test Language and Country update = new Locale("es","ES"); _config.setProperty(ServerConfiguration.ADVANCED_LOCALE, "es_ES"); - serverConfig = new ServerConfiguration(_config); - serverConfig.initialise(); - assertEquals(update, serverConfig.getLocale()); + _serverConfig = new ServerConfiguration(_config); + _serverConfig.initialise(); + assertEquals(update, _serverConfig.getLocale()); //Test Language and Country and Variant update = new Locale("es","ES", "Traditional_WIN"); _config.setProperty(ServerConfiguration.ADVANCED_LOCALE, "es_ES_Traditional_WIN"); - serverConfig = new ServerConfiguration(_config); - serverConfig.initialise(); - assertEquals(update, serverConfig.getLocale()); + _serverConfig = new ServerConfiguration(_config); + _serverConfig.initialise(); + assertEquals(update, _serverConfig.getLocale()); } public void testGetMsgAuth() throws ConfigurationException { // Check default - ServerConfiguration serverConfig = new ServerConfiguration(_config); - serverConfig.initialise(); - assertEquals(false, serverConfig.getMsgAuth()); + _serverConfig.initialise(); + assertEquals(false, _serverConfig.getMsgAuth()); // Check value we set _config.setProperty("security.msg-auth", true); - serverConfig = new ServerConfiguration(_config); - serverConfig.initialise(); - assertEquals(true, serverConfig.getMsgAuth()); - } - - public void testGetJMXPrincipalDatabase() throws ConfigurationException - { - // Check default - ServerConfiguration serverConfig = new ServerConfiguration(_config); - serverConfig.initialise(); - assertEquals(null, serverConfig.getJMXPrincipalDatabase()); - - // Check value we set - _config.setProperty("security.jmx.principal-database", "a"); - serverConfig = new ServerConfiguration(_config); - serverConfig.initialise(); - assertEquals("a", serverConfig.getJMXPrincipalDatabase()); + _serverConfig = new ServerConfiguration(_config); + _serverConfig.initialise(); + assertEquals(true, _serverConfig.getMsgAuth()); } public void testGetManagementKeyStorePath() throws ConfigurationException { // Check default - ServerConfiguration serverConfig = new ServerConfiguration(_config); - serverConfig.initialise(); - assertEquals(null, serverConfig.getManagementKeyStorePath()); + _serverConfig.initialise(); + assertEquals(null, _serverConfig.getManagementKeyStorePath()); // Check value we set _config.setProperty("management.ssl.keyStorePath", "a"); - serverConfig = new ServerConfiguration(_config); - serverConfig.initialise(); - assertEquals("a", serverConfig.getManagementKeyStorePath()); + _serverConfig = new ServerConfiguration(_config); + _serverConfig.initialise(); + assertEquals("a", _serverConfig.getManagementKeyStorePath()); } public void testGetManagementSSLEnabled() throws ConfigurationException { // Check default - ServerConfiguration serverConfig = new ServerConfiguration(_config); - serverConfig.initialise(); - assertEquals(true, serverConfig.getManagementSSLEnabled()); + _serverConfig.initialise(); + assertEquals(true, _serverConfig.getManagementSSLEnabled()); // Check value we set _config.setProperty("management.ssl.enabled", false); - serverConfig = new ServerConfiguration(_config); - serverConfig.initialise(); - assertEquals(false, serverConfig.getManagementSSLEnabled()); + _serverConfig = new ServerConfiguration(_config); + _serverConfig.initialise(); + assertEquals(false, _serverConfig.getManagementSSLEnabled()); } - public void testGetManagementKeyStorePassword() throws ConfigurationException + public void testGetManagementKeystorePassword() throws ConfigurationException { // Check default - ServerConfiguration serverConfig = new ServerConfiguration(_config); - serverConfig.initialise(); - assertEquals(null, serverConfig.getManagementKeyStorePassword()); + _serverConfig.initialise(); + assertEquals(null, _serverConfig.getManagementKeyStorePassword()); // Check value we set _config.setProperty("management.ssl.keyStorePassword", "a"); - serverConfig = new ServerConfiguration(_config); - serverConfig.initialise(); - assertEquals("a", serverConfig.getManagementKeyStorePassword()); + _serverConfig = new ServerConfiguration(_config); + _serverConfig.initialise(); + assertEquals("a", _serverConfig.getManagementKeyStorePassword()); } public void testGetQueueAutoRegister() throws ConfigurationException { // Check default - ServerConfiguration serverConfig = new ServerConfiguration(_config); - serverConfig.initialise(); - assertEquals(true, serverConfig.getQueueAutoRegister()); + _serverConfig.initialise(); + assertEquals(true, _serverConfig.getQueueAutoRegister()); // Check value we set _config.setProperty("queue.auto_register", false); - serverConfig = new ServerConfiguration(_config); - serverConfig.initialise(); - assertEquals(false, serverConfig.getQueueAutoRegister()); + _serverConfig = new ServerConfiguration(_config); + _serverConfig.initialise(); + assertEquals(false, _serverConfig.getQueueAutoRegister()); } public void testGetManagementEnabled() throws ConfigurationException { // Check default - ServerConfiguration serverConfig = new ServerConfiguration(_config); - serverConfig.initialise(); - assertEquals(true, serverConfig.getManagementEnabled()); + _serverConfig.initialise(); + assertEquals(true, _serverConfig.getManagementEnabled()); // Check value we set _config.setProperty("management.enabled", false); - serverConfig = new ServerConfiguration(_config); - serverConfig.initialise(); - assertEquals(false, serverConfig.getManagementEnabled()); + _serverConfig = new ServerConfiguration(_config); + _serverConfig.initialise(); + assertEquals(false, _serverConfig.getManagementEnabled()); } public void testSetManagementEnabled() throws ConfigurationException { // Check value we set - ServerConfiguration serverConfig = new ServerConfiguration(_config); - serverConfig.initialise(); - serverConfig.setManagementEnabled(false); - assertEquals(false, serverConfig.getManagementEnabled()); + _serverConfig.initialise(); + _serverConfig.setManagementEnabled(false); + assertEquals(false, _serverConfig.getManagementEnabled()); } public void testGetHeartBeatDelay() throws ConfigurationException { // Check default - ServerConfiguration serverConfig = new ServerConfiguration(_config); - serverConfig.initialise(); - assertEquals(5, serverConfig.getHeartBeatDelay()); + _serverConfig.initialise(); + assertEquals(5, _serverConfig.getHeartBeatDelay()); // Check value we set _config.setProperty("heartbeat.delay", 23); - serverConfig = new ServerConfiguration(_config); - serverConfig.initialise(); - assertEquals(23, serverConfig.getHeartBeatDelay()); + _serverConfig = new ServerConfiguration(_config); + _serverConfig.initialise(); + assertEquals(23, _serverConfig.getHeartBeatDelay()); } public void testGetHeartBeatTimeout() throws ConfigurationException { // Check default - ServerConfiguration serverConfig = new ServerConfiguration(_config); - serverConfig.initialise(); - assertEquals(2.0, serverConfig.getHeartBeatTimeout()); + _serverConfig.initialise(); + assertEquals(2.0, _serverConfig.getHeartBeatTimeout()); // Check value we set _config.setProperty("heartbeat.timeoutFactor", 2.3); - serverConfig = new ServerConfiguration(_config); - serverConfig.initialise(); - assertEquals(2.3, serverConfig.getHeartBeatTimeout()); + _serverConfig = new ServerConfiguration(_config); + _serverConfig.initialise(); + assertEquals(2.3, _serverConfig.getHeartBeatTimeout()); } public void testGetMaximumMessageAge() throws ConfigurationException { // Check default - ServerConfiguration serverConfig = new ServerConfiguration(_config); - serverConfig.initialise(); - assertEquals(0, serverConfig.getMaximumMessageAge()); + _serverConfig.initialise(); + assertEquals(0, _serverConfig.getMaximumMessageAge()); // Check value we set _config.setProperty("maximumMessageAge", 10L); - serverConfig = new ServerConfiguration(_config); - serverConfig.initialise(); - assertEquals(10, serverConfig.getMaximumMessageAge()); + _serverConfig = new ServerConfiguration(_config); + _serverConfig.initialise(); + assertEquals(10, _serverConfig.getMaximumMessageAge()); } public void testGetMaximumMessageCount() throws ConfigurationException { // Check default - ServerConfiguration serverConfig = new ServerConfiguration(_config); - serverConfig.initialise(); - assertEquals(0, serverConfig.getMaximumMessageCount()); + _serverConfig.initialise(); + assertEquals(0, _serverConfig.getMaximumMessageCount()); // Check value we set _config.setProperty("maximumMessageCount", 10L); - serverConfig = new ServerConfiguration(_config); - serverConfig.initialise(); - assertEquals(10, serverConfig.getMaximumMessageCount()); + _serverConfig = new ServerConfiguration(_config); + _serverConfig.initialise(); + assertEquals(10, _serverConfig.getMaximumMessageCount()); } public void testGetMaximumQueueDepth() throws ConfigurationException { // Check default - ServerConfiguration serverConfig = new ServerConfiguration(_config); - serverConfig.initialise(); - assertEquals(0, serverConfig.getMaximumQueueDepth()); + _serverConfig.initialise(); + assertEquals(0, _serverConfig.getMaximumQueueDepth()); // Check value we set _config.setProperty("maximumQueueDepth", 10L); - serverConfig = new ServerConfiguration(_config); - serverConfig.initialise(); - assertEquals(10, serverConfig.getMaximumQueueDepth()); + _serverConfig = new ServerConfiguration(_config); + _serverConfig.initialise(); + assertEquals(10, _serverConfig.getMaximumQueueDepth()); } public void testGetMaximumMessageSize() throws ConfigurationException { // Check default - ServerConfiguration serverConfig = new ServerConfiguration(_config); - serverConfig.initialise(); - assertEquals(0, serverConfig.getMaximumMessageSize()); + _serverConfig.initialise(); + assertEquals(0, _serverConfig.getMaximumMessageSize()); // Check value we set _config.setProperty("maximumMessageSize", 10L); - serverConfig = new ServerConfiguration(_config); - serverConfig.initialise(); - assertEquals(10, serverConfig.getMaximumMessageSize()); + _serverConfig = new ServerConfiguration(_config); + _serverConfig.initialise(); + assertEquals(10, _serverConfig.getMaximumMessageSize()); } public void testGetMinimumAlertRepeatGap() throws ConfigurationException { // Check default - ServerConfiguration serverConfig = new ServerConfiguration(_config); - serverConfig.initialise(); - assertEquals(0, serverConfig.getMinimumAlertRepeatGap()); + _serverConfig.initialise(); + assertEquals(0, _serverConfig.getMinimumAlertRepeatGap()); // Check value we set _config.setProperty("minimumAlertRepeatGap", 10L); - serverConfig = new ServerConfiguration(_config); - serverConfig.initialise(); - assertEquals(10, serverConfig.getMinimumAlertRepeatGap()); + _serverConfig = new ServerConfiguration(_config); + _serverConfig.initialise(); + assertEquals(10, _serverConfig.getMinimumAlertRepeatGap()); } public void testGetProcessors() throws ConfigurationException { // Check default - ServerConfiguration serverConfig = new ServerConfiguration(_config); - serverConfig.initialise(); - assertEquals(4, serverConfig.getProcessors()); + _serverConfig.initialise(); + assertEquals(4, _serverConfig.getConnectorProcessors()); // Check value we set _config.setProperty("connector.processors", 10); - serverConfig = new ServerConfiguration(_config); - serverConfig.initialise(); - assertEquals(10, serverConfig.getProcessors()); - } + _serverConfig = new ServerConfiguration(_config); + _serverConfig.initialise(); + assertEquals(10, _serverConfig.getConnectorProcessors()); + } - public void testGetPort() throws ConfigurationException + public void testGetPorts() throws ConfigurationException { // Check default - ServerConfiguration serverConfig = new ServerConfiguration(_config); - serverConfig.initialise(); - assertNotNull(serverConfig.getPorts()); - assertEquals(1, serverConfig.getPorts().size()); - assertEquals(5672, serverConfig.getPorts().get(0)); + _serverConfig.initialise(); + assertNotNull(_serverConfig.getPorts()); + assertEquals(1, _serverConfig.getPorts().size()); + assertEquals(5672, _serverConfig.getPorts().get(0)); // Check value we set _config.setProperty("connector.port", "10"); - serverConfig = new ServerConfiguration(_config); - serverConfig.initialise(); - assertNotNull(serverConfig.getPorts()); - assertEquals(1, serverConfig.getPorts().size()); - assertEquals("10", serverConfig.getPorts().get(0)); + _serverConfig = new ServerConfiguration(_config); + _serverConfig.initialise(); + assertNotNull(_serverConfig.getPorts()); + assertEquals(1, _serverConfig.getPorts().size()); + assertEquals("10", _serverConfig.getPorts().get(0)); } public void testGetBind() throws ConfigurationException { // Check default - ServerConfiguration serverConfig = new ServerConfiguration(_config); - serverConfig.initialise(); - assertEquals("wildcard", serverConfig.getBind()); + _serverConfig.initialise(); + assertEquals(WILDCARD_ADDRESS, _serverConfig.getBind()); // Check value we set _config.setProperty("connector.bind", "a"); - serverConfig = new ServerConfiguration(_config); - serverConfig.initialise(); - assertEquals("a", serverConfig.getBind()); + _serverConfig = new ServerConfiguration(_config); + _serverConfig.initialise(); + assertEquals("a", _serverConfig.getBind()); } public void testGetReceiveBufferSize() throws ConfigurationException { // Check default - ServerConfiguration serverConfig = new ServerConfiguration(_config); - serverConfig.initialise(); - assertEquals(32767, serverConfig.getReceiveBufferSize()); + _serverConfig.initialise(); + assertEquals(ServerConfiguration.DEFAULT_BUFFER_SIZE, _serverConfig.getReceiveBufferSize()); // Check value we set _config.setProperty("connector.socketReceiveBuffer", "23"); - serverConfig = new ServerConfiguration(_config); - serverConfig.initialise(); - assertEquals(23, serverConfig.getReceiveBufferSize()); + _serverConfig = new ServerConfiguration(_config); + _serverConfig.initialise(); + assertEquals(23, _serverConfig.getReceiveBufferSize()); } public void testGetWriteBufferSize() throws ConfigurationException { // Check default - ServerConfiguration serverConfig = new ServerConfiguration(_config); - serverConfig.initialise(); - assertEquals(32767, serverConfig.getWriteBufferSize()); + _serverConfig.initialise(); + assertEquals(ServerConfiguration.DEFAULT_BUFFER_SIZE, _serverConfig.getWriteBufferSize()); // Check value we set _config.setProperty("connector.socketWriteBuffer", "23"); - serverConfig = new ServerConfiguration(_config); - serverConfig.initialise(); - assertEquals(23, serverConfig.getWriteBufferSize()); + _serverConfig = new ServerConfiguration(_config); + _serverConfig.initialise(); + assertEquals(23, _serverConfig.getWriteBufferSize()); } public void testGetTcpNoDelay() throws ConfigurationException { // Check default - ServerConfiguration serverConfig = new ServerConfiguration(_config); - serverConfig.initialise(); - assertEquals(true, serverConfig.getTcpNoDelay()); + _serverConfig.initialise(); + assertEquals(true, _serverConfig.getTcpNoDelay()); // Check value we set _config.setProperty("connector.tcpNoDelay", false); - serverConfig = new ServerConfiguration(_config); - serverConfig.initialise(); - assertEquals(false, serverConfig.getTcpNoDelay()); + _serverConfig = new ServerConfiguration(_config); + _serverConfig.initialise(); + assertEquals(false, _serverConfig.getTcpNoDelay()); } public void testGetEnableExecutorPool() throws ConfigurationException { // Check default - ServerConfiguration serverConfig = new ServerConfiguration(_config); - serverConfig.initialise(); - assertEquals(false, serverConfig.getEnableExecutorPool()); + _serverConfig.initialise(); + assertEquals(false, _serverConfig.getEnableExecutorPool()); // Check value we set _config.setProperty("advanced.filterchain[@enableExecutorPool]", true); - serverConfig = new ServerConfiguration(_config); - serverConfig.initialise(); - assertEquals(true, serverConfig.getEnableExecutorPool()); + _serverConfig = new ServerConfiguration(_config); + _serverConfig.initialise(); + assertEquals(true, _serverConfig.getEnableExecutorPool()); } public void testGetEnableSSL() throws ConfigurationException { // Check default - ServerConfiguration serverConfig = new ServerConfiguration(_config); - serverConfig.initialise(); - assertEquals(false, serverConfig.getEnableSSL()); + _serverConfig.initialise(); + assertEquals(false, _serverConfig.getEnableSSL()); // Check value we set _config.setProperty("connector.ssl.enabled", true); - serverConfig = new ServerConfiguration(_config); - serverConfig.initialise(); - assertEquals(true, serverConfig.getEnableSSL()); + _serverConfig = new ServerConfiguration(_config); + _serverConfig.initialise(); + assertEquals(true, _serverConfig.getEnableSSL()); } public void testGetSSLOnly() throws ConfigurationException { // Check default - ServerConfiguration serverConfig = new ServerConfiguration(_config); - serverConfig.initialise(); - assertEquals(false, serverConfig.getSSLOnly()); + _serverConfig.initialise(); + assertEquals(false, _serverConfig.getSSLOnly()); // Check value we set _config.setProperty("connector.ssl.sslOnly", true); - serverConfig = new ServerConfiguration(_config); - serverConfig.initialise(); - assertEquals(true, serverConfig.getSSLOnly()); + _serverConfig = new ServerConfiguration(_config); + _serverConfig.initialise(); + assertEquals(true, _serverConfig.getSSLOnly()); } - public void testGetSSLPort() throws ConfigurationException + public void testGetSSLPorts() throws ConfigurationException { // Check default - ServerConfiguration serverConfig = new ServerConfiguration(_config); - serverConfig.initialise(); - assertEquals(8672, serverConfig.getSSLPort()); + _serverConfig.initialise(); + assertNotNull(_serverConfig.getSSLPorts()); + assertEquals(1, _serverConfig.getSSLPorts().size()); + assertEquals(ServerConfiguration.DEFAULT_SSL_PORT, _serverConfig.getSSLPorts().get(0)); - // Check value we set - _config.setProperty("connector.ssl.port", 23); - serverConfig = new ServerConfiguration(_config); - serverConfig.initialise(); - assertEquals(23, serverConfig.getSSLPort()); - } - - public void testGetKeystorePath() throws ConfigurationException - { - // Check default - ServerConfiguration serverConfig = new ServerConfiguration(_config); - serverConfig.initialise(); - assertEquals("none", serverConfig.getKeystorePath()); // Check value we set - _config.setProperty("connector.ssl.keystorePath", "a"); - serverConfig = new ServerConfiguration(_config); - serverConfig.initialise(); - assertEquals("a", serverConfig.getKeystorePath()); + _config.setProperty("connector.ssl.port", "10"); + _serverConfig = new ServerConfiguration(_config); + _serverConfig.initialise(); + assertNotNull(_serverConfig.getSSLPorts()); + assertEquals(1, _serverConfig.getSSLPorts().size()); + assertEquals("10", _serverConfig.getSSLPorts().get(0)); } - public void testGetKeystorePassword() throws ConfigurationException + public void testGetConnectorKeystorePath() throws ConfigurationException { // Check default - ServerConfiguration serverConfig = new ServerConfiguration(_config); - serverConfig.initialise(); - assertEquals("none", serverConfig.getKeystorePassword()); + _serverConfig.initialise(); + assertNull(_serverConfig.getConnectorKeyStorePath()); // Check value we set - _config.setProperty("connector.ssl.keystorePassword", "a"); - serverConfig = new ServerConfiguration(_config); - serverConfig.initialise(); - assertEquals("a", serverConfig.getKeystorePassword()); + _config.setProperty("connector.ssl.keyStorePath", "a"); + _serverConfig = new ServerConfiguration(_config); + _serverConfig.initialise(); + assertEquals("a", _serverConfig.getConnectorKeyStorePath()); + + // Ensure we continue to support the old name keystorePath + _config.clearProperty("connector.ssl.keyStorePath"); + _config.setProperty("connector.ssl.keystorePath", "b"); + _serverConfig = new ServerConfiguration(_config); + _serverConfig.initialise(); + assertEquals("b", _serverConfig.getConnectorKeyStorePath()); } - public void testGetCertType() throws ConfigurationException + public void testGetConnectorKeystorePassword() throws ConfigurationException { // Check default - ServerConfiguration serverConfig = new ServerConfiguration(_config); - serverConfig.initialise(); - assertEquals("SunX509", serverConfig.getCertType()); + _serverConfig.initialise(); + assertNull(_serverConfig.getConnectorKeyStorePassword()); // Check value we set - _config.setProperty("connector.ssl.certType", "a"); - serverConfig = new ServerConfiguration(_config); - serverConfig.initialise(); - assertEquals("a", serverConfig.getCertType()); + _config.setProperty("connector.ssl.keyStorePassword", "a"); + _serverConfig = new ServerConfiguration(_config); + _serverConfig.initialise(); + assertEquals("a", _serverConfig.getConnectorKeyStorePassword()); + + // Ensure we continue to support the old name keystorePassword + _config.clearProperty("connector.ssl.keyStorePassword"); + _config.setProperty("connector.ssl.keystorePassword", "b"); + _serverConfig = new ServerConfiguration(_config); + _serverConfig.initialise(); + assertEquals("b", _serverConfig.getConnectorKeyStorePassword()); } - public void testGetQpidNIO() throws ConfigurationException + public void testGetConnectorCertType() throws ConfigurationException { // Check default - ServerConfiguration serverConfig = new ServerConfiguration(_config); - serverConfig.initialise(); - assertEquals(false, serverConfig.getQpidNIO()); + _serverConfig.initialise(); + assertEquals("SunX509", _serverConfig.getConnectorCertType()); // Check value we set - _config.setProperty("connector.qpidnio", true); - serverConfig = new ServerConfiguration(_config); - serverConfig.initialise(); - assertEquals(true, serverConfig.getQpidNIO()); + _config.setProperty("connector.ssl.certType", "a"); + _serverConfig = new ServerConfiguration(_config); + _serverConfig.initialise(); + assertEquals("a", _serverConfig.getConnectorCertType()); } public void testGetUseBiasedWrites() throws ConfigurationException { // Check default - ServerConfiguration serverConfig = new ServerConfiguration(_config); - serverConfig.initialise(); - assertEquals(false, serverConfig.getUseBiasedWrites()); + _serverConfig.initialise(); + assertEquals(false, _serverConfig.getUseBiasedWrites()); // Check value we set _config.setProperty("advanced.useWriteBiasedPool", true); - serverConfig = new ServerConfiguration(_config); - serverConfig.initialise(); - assertEquals(true, serverConfig.getUseBiasedWrites()); + _serverConfig = new ServerConfiguration(_config); + _serverConfig.initialise(); + assertEquals(true, _serverConfig.getUseBiasedWrites()); } - public void testGetHousekeepingExpiredMessageCheckPeriod() throws ConfigurationException + public void testGetHousekeepingCheckPeriod() throws ConfigurationException { // Check default - ServerConfiguration serverConfig = new ServerConfiguration(_config); - serverConfig.initialise(); - assertEquals(30000, serverConfig.getHousekeepingCheckPeriod()); + _serverConfig.initialise(); + assertEquals(30000, _serverConfig.getHousekeepingCheckPeriod()); // Check value we set - _config.setProperty("housekeeping.expiredMessageCheckPeriod", 23L); - serverConfig = new ServerConfiguration(_config); - serverConfig.initialise(); - assertEquals(23, serverConfig.getHousekeepingCheckPeriod()); - serverConfig.setHousekeepingExpiredMessageCheckPeriod(42L); - assertEquals(42, serverConfig.getHousekeepingCheckPeriod()); + _config.setProperty("housekeeping.checkPeriod", 23L); + _serverConfig = new ServerConfiguration(_config); + _serverConfig.initialise(); + _serverConfig.setHousekeepingCheckPeriod(42L); + assertEquals(42, _serverConfig.getHousekeepingCheckPeriod()); } public void testSingleConfiguration() throws IOException, ConfigurationException @@ -767,7 +623,7 @@ public class ServerConfigurationTest extends InternalBrokerBaseCase out.close(); ServerConfiguration conf = new ServerConfiguration(fileA); conf.initialise(); - assertEquals(4235, conf.getSSLPort()); + assertEquals("4235", conf.getSSLPorts().get(0)); } public void testCombinedConfiguration() throws IOException, ConfigurationException @@ -792,19 +648,17 @@ public class ServerConfigurationTest extends InternalBrokerBaseCase out.close(); out = new FileWriter(fileB); - out.write("<broker><connector><ssl><port>2345</port></ssl><qpidnio>true</qpidnio></connector></broker>"); + out.write("<broker><connector><ssl><port>2345</port></ssl></connector></broker>"); out.close(); ServerConfiguration config = new ServerConfiguration(mainFile.getAbsoluteFile()); config.initialise(); - assertEquals(4235, config.getSSLPort()); // From first file, not + assertEquals("4235", config.getSSLPorts().get(0)); // From first file, not // overriden by second assertNotNull(config.getPorts()); assertEquals(1, config.getPorts().size()); assertEquals("2342", config.getPorts().get(0)); // From the first file, not // present in the second - assertEquals(true, config.getQpidNIO()); // From the second file, not - // present in the first } public void testVariableInterpolation() throws Exception @@ -835,9 +689,8 @@ public class ServerConfigurationTest extends InternalBrokerBaseCase out.write("<broker>\n"); out.write("\t<management><enabled>false</enabled></management>\n"); out.write("\t<security>\n"); - out.write("\t\t<principal-databases>\n"); + out.write("\t\t<pd-auth-manager>\n"); out.write("\t\t\t<principal-database>\n"); - out.write("\t\t\t\t<name>passwordfile</name>\n"); out.write("\t\t\t\t<class>org.apache.qpid.server.security.auth.database.PlainPasswordFilePrincipalDatabase</class>\n"); out.write("\t\t\t\t<attributes>\n"); out.write("\t\t\t\t\t<attribute>\n"); @@ -846,11 +699,7 @@ public class ServerConfigurationTest extends InternalBrokerBaseCase out.write("\t\t\t\t\t</attribute>\n"); out.write("\t\t\t\t</attributes>\n"); out.write("\t\t\t</principal-database>\n"); - out.write("\t\t</principal-databases>\n"); - out.write("\t\t<jmx>\n"); - out.write("\t\t\t<access>/dev/null</access>\n"); - out.write("\t\t\t<principal-database>passwordfile</principal-database>\n"); - out.write("\t\t</jmx>\n"); + out.write("\t\t</pd-auth-manager>\n"); out.write("\t\t<firewall>\n"); out.write("\t\t\t<rule access=\""+ ((allow) ? "allow" : "deny") +"\" network=\"127.0.0.1\"/>"); out.write("\t\t</firewall>\n"); @@ -886,9 +735,8 @@ public class ServerConfigurationTest extends InternalBrokerBaseCase out.write("<broker>\n"); out.write("\t<management><enabled>false</enabled></management>\n"); out.write("\t<security>\n"); - out.write("\t\t<principal-databases>\n"); + out.write("\t\t<pd-auth-manager>\n"); out.write("\t\t\t<principal-database>\n"); - out.write("\t\t\t\t<name>passwordfile</name>\n"); out.write("\t\t\t\t<class>org.apache.qpid.server.security.auth.database.PlainPasswordFilePrincipalDatabase</class>\n"); out.write("\t\t\t\t<attributes>\n"); out.write("\t\t\t\t\t<attribute>\n"); @@ -897,11 +745,7 @@ public class ServerConfigurationTest extends InternalBrokerBaseCase out.write("\t\t\t\t\t</attribute>\n"); out.write("\t\t\t\t</attributes>\n"); out.write("\t\t\t</principal-database>\n"); - out.write("\t\t</principal-databases>\n"); - out.write("\t\t<jmx>\n"); - out.write("\t\t\t<access>/dev/null</access>\n"); - out.write("\t\t\t<principal-database>passwordfile</principal-database>\n"); - out.write("\t\t</jmx>\n"); + out.write("\t\t</pd-auth-manager>\n"); out.write("\t\t<firewall>\n"); out.write("\t\t\t<rule access=\"allow\" network=\"127.0.0.1\"/>"); out.write("\t\t</firewall>\n"); @@ -992,9 +836,8 @@ public class ServerConfigurationTest extends InternalBrokerBaseCase out.write("<broker>\n"); out.write("\t<management><enabled>false</enabled></management>\n"); out.write("\t<security>\n"); - out.write("\t\t<principal-databases>\n"); + out.write("\t\t<pd-auth-manager>\n"); out.write("\t\t\t<principal-database>\n"); - out.write("\t\t\t\t<name>passwordfile</name>\n"); out.write("\t\t\t\t<class>org.apache.qpid.server.security.auth.database.PlainPasswordFilePrincipalDatabase</class>\n"); out.write("\t\t\t\t<attributes>\n"); out.write("\t\t\t\t\t<attribute>\n"); @@ -1003,11 +846,7 @@ public class ServerConfigurationTest extends InternalBrokerBaseCase out.write("\t\t\t\t\t</attribute>\n"); out.write("\t\t\t\t</attributes>\n"); out.write("\t\t\t</principal-database>\n"); - out.write("\t\t</principal-databases>\n"); - out.write("\t\t<jmx>\n"); - out.write("\t\t\t<access>/dev/null</access>\n"); - out.write("\t\t\t<principal-database>passwordfile</principal-database>\n"); - out.write("\t\t</jmx>\n"); + out.write("\t\t</pd-auth-manager>\n"); out.write("\t\t<firewall>\n"); out.write("\t\t\t<rule access=\"allow\" network=\"127.0.0.1\"/>"); out.write("\t\t</firewall>\n"); @@ -1044,8 +883,9 @@ public class ServerConfigurationTest extends InternalBrokerBaseCase writeConfigFile(mainFile, false, true, null, "test"); // Load config + ApplicationRegistry.remove(); ApplicationRegistry reg = new ConfigurationFileApplicationRegistry(mainFile); - ApplicationRegistry.initialise(reg, 1); + ApplicationRegistry.initialise(reg); // Test config VirtualHostRegistry virtualHostRegistry = reg.getVirtualHostRegistry(); @@ -1076,8 +916,9 @@ public class ServerConfigurationTest extends InternalBrokerBaseCase writeVirtualHostsFile(vhostsFile, "test"); // Load config + ApplicationRegistry.remove(); ApplicationRegistry reg = new ConfigurationFileApplicationRegistry(mainFile); - ApplicationRegistry.initialise(reg, 1); + ApplicationRegistry.initialise(reg); // Test config VirtualHostRegistry virtualHostRegistry = reg.getVirtualHostRegistry(); @@ -1110,8 +951,9 @@ public class ServerConfigurationTest extends InternalBrokerBaseCase writeConfigFile(mainFile, false, false, vhostsFile, null); // Load config + ApplicationRegistry.remove(); ApplicationRegistry reg = new ConfigurationFileApplicationRegistry(mainFile); - ApplicationRegistry.initialise(reg, 1); + ApplicationRegistry.initialise(reg); // Test config VirtualHostRegistry virtualHostRegistry = reg.getVirtualHostRegistry(); @@ -1153,9 +995,10 @@ public class ServerConfigurationTest extends InternalBrokerBaseCase // Load config try - { + { + ApplicationRegistry.remove(); ApplicationRegistry reg = new ConfigurationFileApplicationRegistry(mainFile); - ApplicationRegistry.initialise(reg, 1); + ApplicationRegistry.initialise(reg); fail("Different virtualhost XML configurations not allowed"); } catch (ConfigurationException ce) @@ -1188,8 +1031,9 @@ public class ServerConfigurationTest extends InternalBrokerBaseCase // Load config try { + ApplicationRegistry.remove(); ApplicationRegistry reg = new ConfigurationFileApplicationRegistry(mainFile); - ApplicationRegistry.initialise(reg, 1); + ApplicationRegistry.initialise(reg); fail("Multiple virtualhost XML configurations not allowed"); } catch (ConfigurationException ce) @@ -1481,4 +1325,106 @@ public class ServerConfigurationTest extends InternalBrokerBaseCase fail("Should throw a ConfigurationException"); } } + + /* + * Tests that the old element security.jmx.access (that used to be used + * to define JMX access rights) is rejected. + */ + public void testManagementAccessRejected() throws ConfigurationException + { + // Check default + _serverConfig.initialise(); + + // Check value we set + _config.setProperty("security.jmx.access(0)", "jmxremote.access"); + _serverConfig = new ServerConfiguration(_config); + + try + { + _serverConfig.initialise(); + fail("Exception not thrown"); + } + catch (ConfigurationException ce) + { + assertEquals("Incorrect error message", + "Validation error : security/jmx/access is no longer a supported element within the configuration xml.", + ce.getMessage()); + } + } + + /* + * Tests that the old element security.jmx.principal-database (that used to define the + * principal database used for JMX authentication) is rejected. + */ + public void testManagementPrincipalDatabaseRejected() throws ConfigurationException + { + // Check default + _serverConfig.initialise(); + + // Check value we set + _config.setProperty("security.jmx.principal-database(0)", "mydb"); + _serverConfig = new ServerConfiguration(_config); + + try + { + _serverConfig.initialise(); + fail("Exception not thrown"); + } + catch (ConfigurationException ce) + { + assertEquals("Incorrect error message", + "Validation error : security/jmx/principal-database is no longer a supported element within the configuration xml.", + ce.getMessage()); + } + } + + /* + * Tests that the old element security.principal-databases. ... (that used to define + * principal databases) is rejected. + */ + public void testPrincipalDatabasesRejected() throws ConfigurationException + { + _serverConfig.initialise(); + + // Check value we set + _config.setProperty("security.principal-databases.principal-database.class", "myclass"); + _serverConfig = new ServerConfiguration(_config); + + try + { + _serverConfig.initialise(); + fail("Exception not thrown"); + } + catch (ConfigurationException ce) + { + assertEquals("Incorrect error message", + "Validation error : security/principal-databases is no longer supported within the configuration xml.", + ce.getMessage()); + } + } + + /* + * Tests that the old element housekeeping.expiredMessageCheckPeriod. ... (that was + * replaced by housekeeping.checkPeriod) is rejected. + */ + public void testExpiredMessageCheckPeriodRejected() throws ConfigurationException + { + _serverConfig.initialise(); + + // Check value we set + _config.setProperty("housekeeping.expiredMessageCheckPeriod", 23L); + _serverConfig = new ServerConfiguration(_config); + + try + { + _serverConfig.initialise(); + fail("Exception not thrown"); + } + catch (ConfigurationException ce) + { + assertEquals("Incorrect error message", + "Validation error : housekeeping/expiredMessageCheckPeriod must be replaced by housekeeping/checkPeriod.", + ce.getMessage()); + } + } } diff --git a/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java b/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java index 917755e8a5..b133d53ac5 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java @@ -20,6 +20,8 @@ package org.apache.qpid.server.configuration; +import org.apache.commons.configuration.ConfigurationException; +import org.apache.commons.configuration.XMLConfiguration; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.queue.AMQPriorityQueue; import org.apache.qpid.server.queue.AMQQueue; @@ -203,5 +205,50 @@ public class VirtualHostConfigurationTest extends InternalBrokerBaseCase } + /** + * Tests that the old element security.authentication.name is rejected. This element + * was never supported properly as authentication is performed before the virtual host + * is considered. + */ + public void testSecurityAuthenticationNameRejected() throws Exception + { + getConfigXml().addProperty("virtualhosts.virtualhost.testSecurityAuthenticationNameRejected.security.authentication.name", + "testdb"); + + try + { + super.createBroker(); + fail("Exception not thrown"); + } + catch(ConfigurationException ce) + { + assertEquals("Incorrect error message", + "Validation error : security/authentication/name is no longer a supported element within the configuration xml." + + " It appears in virtual host definition : " + getName(), + ce.getMessage()); + } + } + /* + * Tests that the old element housekeeping.expiredMessageCheckPeriod. ... (that was + * replaced by housekeeping.checkPeriod) is rejected. + */ + public void testExpiredMessageCheckPeriodRejected() throws Exception + { + getConfigXml().addProperty("virtualhosts.virtualhost.testExpiredMessageCheckPeriodRejected.housekeeping.expiredMessageCheckPeriod", + 5); + + try + { + super.createBroker(); + fail("Exception not thrown"); + } + catch (ConfigurationException ce) + { + assertEquals("Incorrect error message", + "Validation error : housekeeping/expiredMessageCheckPeriod must be replaced by housekeeping/checkPeriod." + + " It appears in virtual host definition : " + getName(), + ce.getMessage()); + } + } } diff --git a/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java index 7b58966a4c..3b7f5f3a51 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java +++ b/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java @@ -52,6 +52,7 @@ import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.util.InternalBrokerBaseCase; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -276,7 +277,7 @@ public class AbstractHeadersExchangeTestBase extends InternalBrokerBaseCase static ContentHeaderBody getContentHeader(FieldTable headers) { ContentHeaderBody header = new ContentHeaderBody(); - header.properties = getProperties(headers); + header.setProperties(getProperties(headers)); return header; } @@ -428,21 +429,11 @@ public class AbstractHeadersExchangeTestBase extends InternalBrokerBaseCase //To change body of implemented methods use File | Settings | File Templates. } - public void reject(Subscription subscription) - { - //To change body of implemented methods use File | Settings | File Templates. - } - - public boolean isRejectedBy(Subscription subscription) + public boolean isRejectedBy(long subscriptionId) { return false; //To change body of implemented methods use File | Settings | File Templates. } - public void requeue(Subscription subscription) - { - //To change body of implemented methods use File | Settings | File Templates. - } - public void dequeue() { //To change body of implemented methods use File | Settings | File Templates. @@ -482,6 +473,16 @@ public class AbstractHeadersExchangeTestBase extends InternalBrokerBaseCase { return 0; //To change body of implemented methods use File | Settings | File Templates. } + + public boolean isDequeued() + { + return false; + } + + public boolean isDispensed() + { + return false; + } }; if(action != null) @@ -565,8 +566,8 @@ public class AbstractHeadersExchangeTestBase extends InternalBrokerBaseCase int pos = 0; for(ContentBody body : bodies) { - storedMessage.addContent(pos, body.payload.duplicate().buf()); - pos += body.payload.limit(); + storedMessage.addContent(pos, ByteBuffer.wrap(body._payload)); + pos += body._payload.length; } _incoming = new TestIncomingMessage(getMessageId(),publish, protocolsession); diff --git a/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java b/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java index f72961c03c..403a290a0f 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java @@ -396,7 +396,7 @@ public class TopicExchangeTest extends InternalBrokerBaseCase IncomingMessage message = new IncomingMessage(info); final ContentHeaderBody chb = new ContentHeaderBody(); BasicContentHeaderProperties props = new BasicContentHeaderProperties(); - chb.properties = props; + chb.setProperties(props); message.setContentHeaderBody(chb); diff --git a/java/broker/src/test/java/org/apache/qpid/server/logging/UnitTestMessageLogger.java b/java/broker/src/test/java/org/apache/qpid/server/logging/UnitTestMessageLogger.java index 3752dcb37e..e8defd0e58 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/logging/UnitTestMessageLogger.java +++ b/java/broker/src/test/java/org/apache/qpid/server/logging/UnitTestMessageLogger.java @@ -28,11 +28,7 @@ import org.apache.qpid.server.logging.AbstractRootMessageLogger; public class UnitTestMessageLogger extends AbstractRootMessageLogger { - List<Object> _log; - - { - _log = new LinkedList<Object>(); - } + private final List<Object> _log = new LinkedList<Object>(); public UnitTestMessageLogger() { @@ -69,4 +65,14 @@ public class UnitTestMessageLogger extends AbstractRootMessageLogger { _log.clear(); } + + public boolean messageContains(final int index, final String contains) + { + if (index + 1 > _log.size()) + { + throw new IllegalArgumentException("Message with index " + index + " has not been logged"); + } + final String message = _log.get(index).toString(); + return message.contains(contains); + } } diff --git a/java/broker/src/test/java/org/apache/qpid/server/logging/actors/ManagementActorTest.java b/java/broker/src/test/java/org/apache/qpid/server/logging/actors/ManagementActorTest.java index 033ae3b4b3..d6b790db01 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/logging/actors/ManagementActorTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/logging/actors/ManagementActorTest.java @@ -20,13 +20,13 @@ */ package org.apache.qpid.server.logging.actors; -import org.apache.qpid.server.configuration.ServerConfiguration; -import org.apache.qpid.server.logging.LogMessage; -import org.apache.qpid.server.logging.LogSubject; -import org.apache.qpid.AMQException; - +import java.security.PrivilegedAction; +import java.util.Collections; import java.util.List; +import javax.management.remote.JMXPrincipal; +import javax.security.auth.Subject; + /** * Test : AMQPManagementActorTest * Validate the AMQPManagementActor class. @@ -96,8 +96,40 @@ public class ManagementActorTest extends BaseActorTestCase // Verify that the message has the right values assertTrue("Message contains the [mng: prefix", - logs.get(0).toString().contains("[mng:" + CONNECTION_ID + "(" + IP + ")")); + logs.get(0).toString().contains("[mng:N/A(" + IP + ")")); + } + + /** + * Tests appearance of principal name in log message + */ + public void testSubjectPrincipalNameAppearance() + { + Subject subject = new Subject(true, Collections.singleton(new JMXPrincipal("guest")), Collections.EMPTY_SET, + Collections.EMPTY_SET); + + final String message = Subject.doAs(subject, new PrivilegedAction<String>() + { + public String run() + { + return sendTestLogMessage(_amqpActor); + } + }); + + // Verify that the log message was created + assertNotNull("Test log message is not created!", message); + + List<Object> logs = _rawLogger.getLogMessages(); + + // Verify that at least one log message was added to log + assertEquals("Message log size not as expected.", 1, logs.size()); + + String logMessage = logs.get(0).toString(); + + // Verify that the logged message is present in the output + assertTrue("Message was not found in log message", logMessage.contains(message)); + // Verify that the message has the right principal value + assertTrue("Message contains the [mng: prefix", logMessage.contains("[mng:guest(" + IP + ")")); } } diff --git a/java/broker/src/test/java/org/apache/qpid/server/logging/messages/ExchangeMessagesTest.java b/java/broker/src/test/java/org/apache/qpid/server/logging/messages/ExchangeMessagesTest.java index 728a98e009..4364376000 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/logging/messages/ExchangeMessagesTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/logging/messages/ExchangeMessagesTest.java @@ -66,7 +66,6 @@ public class ExchangeMessagesTest extends AbstractTestMessages validateLogMessage(log, "EXH-1001", expected); } - public void testExchangeDeleted() { _logMessage = ExchangeMessages.DELETED(); @@ -77,4 +76,21 @@ public class ExchangeMessagesTest extends AbstractTestMessages validateLogMessage(log, "EXH-1002", expected); } + public void testExchangeDiscardedMessage() + { + // Get the Default Exchange on the Test Vhost for testing + final Exchange exchange = ApplicationRegistry.getInstance(). + getVirtualHostRegistry().getVirtualHost("test"). + getExchangeRegistry().getDefaultExchange(); + + final String name = exchange.getNameShortString().toString(); + final String routingKey = "routingKey"; + + _logMessage = ExchangeMessages.DISCARDMSG(name, routingKey); + List<Object> log = performLog(); + + String[] expected = {"Discarded Message :","Name:", name, "Routing Key:", routingKey}; + + validateLogMessage(log, "EXH-1003", expected); + } } diff --git a/java/broker/src/test/java/org/apache/qpid/server/management/AMQUserManagementMBeanTest.java b/java/broker/src/test/java/org/apache/qpid/server/management/AMQUserManagementMBeanTest.java index a6c17e042e..f3ee2707b0 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/management/AMQUserManagementMBeanTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/management/AMQUserManagementMBeanTest.java @@ -21,22 +21,26 @@ package org.apache.qpid.server.management; -import java.io.BufferedReader; import java.io.BufferedWriter; import java.io.File; -import java.io.FileReader; import java.io.FileWriter; import java.io.IOException; -import org.apache.commons.configuration.ConfigurationException; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.TabularData; + + +import org.apache.commons.lang.NotImplementedException; +import org.apache.qpid.management.common.mbeans.UserManagement; import org.apache.qpid.server.security.auth.database.PlainPasswordFilePrincipalDatabase; import org.apache.qpid.server.security.auth.management.AMQUserManagementMBean; import org.apache.qpid.server.util.InternalBrokerBaseCase; -/* Note: The main purpose is to test the jmx access rights file manipulation - * within AMQUserManagementMBean. The Principal Databases are tested by their own tests, - * this test just exercises their usage in AMQUserManagementMBean. +/** + * + * Tests the AMQUserManagementMBean and its interaction with the PrincipalDatabase. + * */ public class AMQUserManagementMBeanTest extends InternalBrokerBaseCase { @@ -44,7 +48,6 @@ public class AMQUserManagementMBeanTest extends InternalBrokerBaseCase private AMQUserManagementMBean _amqumMBean; private File _passwordFile; - private File _accessFile; private static final String TEST_USERNAME = "testuser"; private static final String TEST_PASSWORD = "password"; @@ -57,7 +60,6 @@ public class AMQUserManagementMBeanTest extends InternalBrokerBaseCase _database = new PlainPasswordFilePrincipalDatabase(); _amqumMBean = new AMQUserManagementMBean(); loadFreshTestPasswordFile(); - loadFreshTestAccessFile(); } @Override @@ -65,142 +67,67 @@ public class AMQUserManagementMBeanTest extends InternalBrokerBaseCase { //clean up test password/access files File _oldPasswordFile = new File(_passwordFile.getAbsolutePath() + ".old"); - File _oldAccessFile = new File(_accessFile.getAbsolutePath() + ".old"); _oldPasswordFile.delete(); - _oldAccessFile.delete(); _passwordFile.delete(); - _accessFile.delete(); super.tearDown(); } public void testDeleteUser() { - loadFreshTestPasswordFile(); - loadFreshTestAccessFile(); + assertEquals("Unexpected number of users before test", 1,_amqumMBean.viewUsers().size()); + assertTrue("Delete should return true to flag successful delete", _amqumMBean.deleteUser(TEST_USERNAME)); + assertEquals("Unexpected number of users after test", 0,_amqumMBean.viewUsers().size()); + } + + public void testDeleteUserWhereUserDoesNotExist() + { + assertEquals("Unexpected number of users before test", 1,_amqumMBean.viewUsers().size()); + assertFalse("Delete should return false to flag unsuccessful delete", _amqumMBean.deleteUser("made.up.username")); + assertEquals("Unexpected number of users after test", 1,_amqumMBean.viewUsers().size()); - //try deleting a non existant user - assertFalse(_amqumMBean.deleteUser("made.up.username")); - - assertTrue(_amqumMBean.deleteUser(TEST_USERNAME)); } - public void testDeleteUserIsSavedToAccessFile() + public void testCreateUser() { - loadFreshTestPasswordFile(); - loadFreshTestAccessFile(); - - assertTrue(_amqumMBean.deleteUser(TEST_USERNAME)); - - //check the access rights were actually deleted from the file - try{ - BufferedReader reader = new BufferedReader(new FileReader(_accessFile)); - - //check the 'generated by' comment line is present - assertTrue("File has no content", reader.ready()); - assertTrue("'Generated by' comment line was missing",reader.readLine().contains("Generated by " + - "AMQUserManagementMBean Console : Last edited by user:")); + assertEquals("Unexpected number of users before test", 1,_amqumMBean.viewUsers().size()); + assertTrue("Create should return true to flag successful create", _amqumMBean.createUser("newuser", "mypass")); + assertEquals("Unexpected number of users before test", 2,_amqumMBean.viewUsers().size()); + } - //there should also be a modified date/time comment line - assertTrue("File has no modified date/time comment line", reader.ready()); - assertTrue("Modification date/time comment line was missing",reader.readLine().startsWith("#")); - - //the access file should not contain any further data now as we just deleted the only user - assertFalse("User access data was present when it should have been deleted", reader.ready()); - } - catch (IOException e) - { - fail("Unable to valdate file contents due to:" + e.getMessage()); - } - + public void testCreateUserWhereUserAlreadyExists() + { + assertEquals("Unexpected number of users before test", 1,_amqumMBean.viewUsers().size()); + assertFalse("Create should return false to flag unsuccessful create", _amqumMBean.createUser(TEST_USERNAME, "mypass")); + assertEquals("Unexpected number of users before test", 1,_amqumMBean.viewUsers().size()); } - public void testSetRights() + public void testSetPassword() { - loadFreshTestPasswordFile(); - loadFreshTestAccessFile(); - - assertFalse(_amqumMBean.setRights("made.up.username", true, false, false)); - - assertTrue(_amqumMBean.setRights(TEST_USERNAME, true, false, false)); - assertTrue(_amqumMBean.setRights(TEST_USERNAME, false, true, false)); - assertTrue(_amqumMBean.setRights(TEST_USERNAME, false, false, true)); + assertTrue("Set password should return true to flag successful change", _amqumMBean.setPassword(TEST_USERNAME, "newpassword")); } - public void testSetRightsIsSavedToAccessFile() + public void testSetPasswordWhereUserDoesNotExist() { - loadFreshTestPasswordFile(); - loadFreshTestAccessFile(); - - assertTrue(_amqumMBean.setRights(TEST_USERNAME, false, false, true)); - - //check the access rights were actually updated in the file - try{ - BufferedReader reader = new BufferedReader(new FileReader(_accessFile)); - - //check the 'generated by' comment line is present - assertTrue("File has no content", reader.ready()); - assertTrue("'Generated by' comment line was missing",reader.readLine().contains("Generated by " + - "AMQUserManagementMBean Console : Last edited by user:")); - - //there should also be a modified date/time comment line - assertTrue("File has no modified date/time comment line", reader.ready()); - assertTrue("Modification date/time comment line was missing",reader.readLine().startsWith("#")); - - //the access file should not contain any further data now as we just deleted the only user - assertTrue("User access data was not updated in the access file", - reader.readLine().equals(TEST_USERNAME + "=" + MBeanInvocationHandlerImpl.ADMIN)); - - //the access file should not contain any further data now as we just deleted the only user - assertFalse("Additional user access data was present when there should be no more", reader.ready()); - } - catch (IOException e) - { - fail("Unable to valdate file contents due to:" + e.getMessage()); - } + assertFalse("Set password should return false to flag successful change", _amqumMBean.setPassword("made.up.username", "newpassword")); } - public void testSetAccessFileWithMissingFile() + public void testViewUsers() { - try - { - _amqumMBean.setAccessFile("made.up.filename"); - } - catch (IOException e) - { - fail("Should not have been an IOE." + e.getMessage()); - } - catch (ConfigurationException e) - { - assertTrue(e.getMessage(), e.getMessage().endsWith("does not exist")); - } - } - - public void testSetAccessFileWithReadOnlyFile() - { - File testFile = null; - try - { - testFile = File.createTempFile(this.getClass().getName(),".access.readonly"); - BufferedWriter passwordWriter = new BufferedWriter(new FileWriter(testFile, false)); - passwordWriter.write(TEST_USERNAME + ":" + TEST_PASSWORD); - passwordWriter.newLine(); - passwordWriter.flush(); - passwordWriter.close(); + TabularData userList = _amqumMBean.viewUsers(); - testFile.setReadOnly(); - _amqumMBean.setAccessFile(testFile.getPath()); - } - catch (IOException e) - { - fail("Access file was not created." + e.getMessage()); - } - catch (ConfigurationException e) - { - fail("There should not have been a configuration exception." + e.getMessage()); - } - - testFile.delete(); + assertNotNull(userList); + assertEquals("Unexpected number of users in user list", 1, userList.size()); + assertTrue(userList.containsKey(new Object[]{TEST_USERNAME})); + + // Check the deprecated read, write and admin items continue to exist but return false. + CompositeData userRec = userList.get(new Object[]{TEST_USERNAME}); + assertTrue(userRec.containsKey(UserManagement.RIGHTS_READ_ONLY)); + assertEquals(false, userRec.get(UserManagement.RIGHTS_READ_ONLY)); + assertEquals(false, userRec.get(UserManagement.RIGHTS_READ_WRITE)); + assertTrue(userRec.containsKey(UserManagement.RIGHTS_READ_WRITE)); + assertTrue(userRec.containsKey(UserManagement.RIGHTS_ADMIN)); + assertEquals(false, userRec.get(UserManagement.RIGHTS_ADMIN)); } // ============================ Utility methods ========================= @@ -227,37 +154,4 @@ public class AMQUserManagementMBeanTest extends InternalBrokerBaseCase fail("Unable to create test password file: " + e.getMessage()); } } - - private void loadFreshTestAccessFile() - { - try - { - if(_accessFile == null) - { - _accessFile = File.createTempFile(this.getClass().getName(),".access"); - } - - BufferedWriter accessWriter = new BufferedWriter(new FileWriter(_accessFile,false)); - accessWriter.write("#Last Updated By comment"); - accessWriter.newLine(); - accessWriter.write("#Date/time comment"); - accessWriter.newLine(); - accessWriter.write(TEST_USERNAME + "=" + MBeanInvocationHandlerImpl.READONLY); - accessWriter.newLine(); - accessWriter.flush(); - accessWriter.close(); - } - catch (IOException e) - { - fail("Unable to create test access file: " + e.getMessage()); - } - - try{ - _amqumMBean.setAccessFile(_accessFile.toString()); - } - catch (Exception e) - { - fail("Unable to set access file: " + e.getMessage()); - } - } } diff --git a/java/broker/src/test/java/org/apache/qpid/server/plugins/MockPluginManager.java b/java/broker/src/test/java/org/apache/qpid/server/plugins/MockPluginManager.java deleted file mode 100644 index a64ec5d3b1..0000000000 --- a/java/broker/src/test/java/org/apache/qpid/server/plugins/MockPluginManager.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpid.server.plugins; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.qpid.server.configuration.plugins.ConfigurationPluginFactory; -import org.apache.qpid.server.exchange.ExchangeType; -import org.apache.qpid.server.security.SecurityPluginFactory; - -public class MockPluginManager extends PluginManager -{ - private Map<String, SecurityPluginFactory> _securityPlugins = new HashMap<String, SecurityPluginFactory>(); - private Map<List<String>, ConfigurationPluginFactory> _configPlugins = new HashMap<List<String>, ConfigurationPluginFactory>(); - - public MockPluginManager(String pluginPath, String cachePath) throws Exception - { - super(pluginPath, cachePath); - } - - @Override - public Map<String, ExchangeType<?>> getExchanges() - { - return null; - } - - @Override - public Map<String, SecurityPluginFactory> getSecurityPlugins() - { - return _securityPlugins; - } - - @Override - public Map<List<String>, ConfigurationPluginFactory> getConfigurationPlugins() - { - return _configPlugins; - } -} diff --git a/java/broker/src/test/java/org/apache/qpid/server/plugins/OsgiSystemPackageUtilTest.java b/java/broker/src/test/java/org/apache/qpid/server/plugins/OsgiSystemPackageUtilTest.java new file mode 100644 index 0000000000..4a03445357 --- /dev/null +++ b/java/broker/src/test/java/org/apache/qpid/server/plugins/OsgiSystemPackageUtilTest.java @@ -0,0 +1,93 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.plugins; + +import java.util.Map; +import java.util.TreeMap; + +import org.apache.qpid.test.utils.QpidTestCase; +import org.osgi.framework.Version; + +/** + * + */ +public class OsgiSystemPackageUtilTest extends QpidTestCase +{ + private OsgiSystemPackageUtil _util = null; // Object under test + + private Map<String, String> _map = new TreeMap<String, String>(); // Use a TreeMap for unit test in order for determinstic results. + + public void testWithOnePackage() throws Exception + { + _map.put("org.xyz", "1.0.0"); + + _util = new OsgiSystemPackageUtil(null, _map); + + final String systemPackageString = _util.getFormattedSystemPackageString(); + + assertEquals("org.xyz; version=1.0.0", systemPackageString); + } + + public void testWithTwoPackages() throws Exception + { + _map.put("org.xyz", "1.0.0"); + _map.put("org.abc", "1.2.3"); + + _util = new OsgiSystemPackageUtil(null, _map); + + final String systemPackageString = _util.getFormattedSystemPackageString(); + + assertEquals("org.abc; version=1.2.3, org.xyz; version=1.0.0", systemPackageString); + } + + public void testWithNoPackages() throws Exception + { + _util = new OsgiSystemPackageUtil(null, _map); + + final String systemPackageString = _util.getFormattedSystemPackageString(); + + assertNull(systemPackageString); + } + + public void testWithQpidPackageWithQpidReleaseNumberSet() throws Exception + { + _map.put("org.apache.qpid.xyz", "1.0.0"); + _map.put("org.abc", "1.2.3"); + + _util = new OsgiSystemPackageUtil(new Version("0.13"), _map); + + final String systemPackageString = _util.getFormattedSystemPackageString(); + + assertEquals("org.abc; version=1.2.3, org.apache.qpid.xyz; version=0.13.0", systemPackageString); + } + + public void testWithQpidPackageWithoutQpidReleaseNumberSet() throws Exception + { + _map.put("org.apache.qpid.xyz", "1.0.0"); + _map.put("org.abc", "1.2.3"); + + _util = new OsgiSystemPackageUtil(null, _map); + + final String systemPackageString = _util.getFormattedSystemPackageString(); + + assertEquals("org.abc; version=1.2.3, org.apache.qpid.xyz; version=1.0.0", systemPackageString); + } +} diff --git a/java/broker/src/test/java/org/apache/qpid/server/plugins/PluginTest.java b/java/broker/src/test/java/org/apache/qpid/server/plugins/PluginTest.java index 8c18ab85b0..8c945aabfb 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/plugins/PluginTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/plugins/PluginTest.java @@ -49,7 +49,7 @@ public class PluginTest extends InternalBrokerBaseCase public void testNoExchanges() throws Exception { - PluginManager manager = new PluginManager("/path/to/nowhere", "/tmp"); + PluginManager manager = new PluginManager("/path/to/nowhere", "/tmp", null); Map<String, ExchangeType<?>> exchanges = manager.getExchanges(); assertTrue("Exchanges found", exchanges.isEmpty()); } diff --git a/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java b/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java index 3b6cd37ea9..5a411c6807 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java +++ b/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java @@ -20,9 +20,16 @@ */ package org.apache.qpid.server.protocol; -import java.security.Principal; -import java.util.*; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import javax.security.auth.Subject; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; @@ -30,35 +37,31 @@ import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.AMQChannel; -import org.apache.qpid.server.output.ProtocolOutputConverter; import org.apache.qpid.server.message.AMQMessage; +import org.apache.qpid.server.message.MessageContentSource; +import org.apache.qpid.server.output.ProtocolOutputConverter; import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.state.AMQState; +import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal; import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.server.message.MessageContentSource; -import org.apache.qpid.transport.TestNetworkDriver; +import org.apache.qpid.transport.TestNetworkConnection; public class InternalTestProtocolSession extends AMQProtocolEngine implements ProtocolOutputConverter { // ChannelID(LIST) -> LinkedList<Pair> final Map<Integer, Map<AMQShortString, LinkedList<DeliveryPair>>> _channelDelivers; private AtomicInteger _deliveryCount = new AtomicInteger(0); + private static final AtomicLong ID_GENERATOR = new AtomicLong(0); public InternalTestProtocolSession(VirtualHost virtualHost) throws AMQException { - super(ApplicationRegistry.getInstance().getVirtualHostRegistry(), new TestNetworkDriver()); + super(ApplicationRegistry.getInstance().getVirtualHostRegistry(), new TestNetworkConnection(), ID_GENERATOR.getAndIncrement()); _channelDelivers = new HashMap<Integer, Map<AMQShortString, LinkedList<DeliveryPair>>>(); // Need to authenticate session for it to be representative testing. - setAuthorizedID(new Principal() - { - public String getName() - { - return "InternalTestProtocolSession"; - } - }); + setAuthorizedSubject(new Subject(true, Collections.singleton(new UsernamePrincipal("InternalTestProtocolSession")), + Collections.EMPTY_SET, Collections.EMPTY_SET)); setVirtualHost(virtualHost); } @@ -193,7 +196,7 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr return _closed; } - public void closeProtocolSession(boolean waitLast) + public void closeProtocolSession() { // Override as we don't have a real IOSession to close. // The alternative is to fully implement the TestIOSession to return a CloseFuture from close(); diff --git a/java/broker/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java b/java/broker/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java new file mode 100644 index 0000000000..9d76d5efca --- /dev/null +++ b/java/broker/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java @@ -0,0 +1,146 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +package org.apache.qpid.server.protocol; + +import java.nio.ByteBuffer; +import java.util.EnumSet; +import java.util.Set; + +import org.apache.commons.configuration.XMLConfiguration; +import org.apache.qpid.protocol.ServerProtocolEngine; +import org.apache.qpid.server.configuration.ServerConfiguration; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.util.TestApplicationRegistry; +import org.apache.qpid.test.utils.QpidTestCase; +import org.apache.qpid.transport.TestNetworkConnection; + +public class MultiVersionProtocolEngineFactoryTest extends QpidTestCase +{ + protected void setUp() throws Exception + { + super.setUp(); + + //the factory needs a registry instance + ApplicationRegistry.initialise(new TestApplicationRegistry(new ServerConfiguration(new XMLConfiguration()))); + } + + protected void tearDown() + { + //the factory opens a registry instance + ApplicationRegistry.remove(); + } + + private static final byte[] AMQP_0_8_HEADER = + new byte[] { (byte) 'A', + (byte) 'M', + (byte) 'Q', + (byte) 'P', + (byte) 1, + (byte) 1, + (byte) 8, + (byte) 0 + }; + + private static final byte[] AMQP_0_9_HEADER = + new byte[] { (byte) 'A', + (byte) 'M', + (byte) 'Q', + (byte) 'P', + (byte) 1, + (byte) 1, + (byte) 0, + (byte) 9 + }; + + private static final byte[] AMQP_0_9_1_HEADER = + new byte[] { (byte) 'A', + (byte) 'M', + (byte) 'Q', + (byte) 'P', + (byte) 0, + (byte) 0, + (byte) 9, + (byte) 1 + }; + + + private static final byte[] AMQP_0_10_HEADER = + new byte[] { (byte) 'A', + (byte) 'M', + (byte) 'Q', + (byte) 'P', + (byte) 1, + (byte) 1, + (byte) 0, + (byte) 10 + }; + + private byte[] getAmqpHeader(final AmqpProtocolVersion version) + { + switch(version) + { + case v0_8: + return AMQP_0_8_HEADER; + case v0_9: + return AMQP_0_9_HEADER; + case v0_9_1: + return AMQP_0_9_1_HEADER; + case v0_10: + return AMQP_0_10_HEADER; + default: + fail("unknown AMQP version, appropriate header must be added for new protocol version"); + return null; + } + } + + /** + * Test to verify that connections established using a MultiVersionProtocolEngine are assigned + * IDs from a common sequence, independent of the protocol version under use. + */ + public void testDifferentProtocolVersionsShareCommonIDNumberingSequence() + { + Set<AmqpProtocolVersion> versions = EnumSet.allOf(AmqpProtocolVersion.class); + + MultiVersionProtocolEngineFactory factory = + new MultiVersionProtocolEngineFactory("localhost", versions); + + //create a dummy to retrieve the 'current' ID number + long previousId = factory.newProtocolEngine(new TestNetworkConnection()).getConnectionId(); + + //create a protocol engine and send the AMQP header for all supported AMQP verisons, + //ensuring the ID assigned increases as expected + for(AmqpProtocolVersion version : versions) + { + long expectedID = previousId + 1; + byte[] header = getAmqpHeader(version); + assertNotNull("protocol header should not be null", header); + + ServerProtocolEngine engine = factory.newProtocolEngine(new TestNetworkConnection()); + assertEquals("ID did not increment as expected", expectedID, engine.getConnectionId()); + + //actually feed in the AMQP header for this protocol version, and ensure the ID remains consistent + engine.received(ByteBuffer.wrap(header)); + assertEquals("ID was not as expected following receipt of the AMQP version header", expectedID, engine.getConnectionId()); + + previousId = expectedID; + } + } +} diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java index d52f4c03f3..3961b3b355 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java @@ -96,7 +96,7 @@ public class AMQPriorityQueueTest extends SimpleAMQQueueTest AMQMessage msg = super.createMessage(id); BasicContentHeaderProperties props = new BasicContentHeaderProperties(); props.setPriority(i); - msg.getContentHeaderBody().properties = props; + msg.getContentHeaderBody().setProperties(props); return msg; } diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java index 0707cab3d5..47b8b7eb18 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java @@ -20,7 +20,6 @@ */ package org.apache.qpid.server.queue; -import org.apache.mina.common.ByteBuffer; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ContentHeaderBody; @@ -277,7 +276,7 @@ public class AMQQueueAlertTest extends InternalBrokerBaseCase ContentHeaderBody contentHeaderBody = new ContentHeaderBody(); BasicContentHeaderProperties props = new BasicContentHeaderProperties(); - contentHeaderBody.properties = props; + contentHeaderBody.setProperties(props); contentHeaderBody.bodySize = size; // in bytes IncomingMessage message = new IncomingMessage(publish); message.setContentHeaderBody(contentHeaderBody); @@ -289,7 +288,7 @@ public class AMQQueueAlertTest extends InternalBrokerBaseCase protected void configure() { // Increase Alert Check period - getConfiguration().setHousekeepingExpiredMessageCheckPeriod(200); + getConfiguration().setHousekeepingCheckPeriod(200); } private void sendMessages(AMQChannel channel, long messageCount, final long size) throws AMQException @@ -312,18 +311,14 @@ public class AMQQueueAlertTest extends InternalBrokerBaseCase { messages[i].addContentBodyFrame(new ContentChunk(){ - ByteBuffer _data = ByteBuffer.allocate((int)size); - - { - _data.limit((int)size); - } + byte[] _data = new byte[(int)size]; public int getSize() { return (int) size; } - public ByteBuffer getData() + public byte[] getData() { return _data; } diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java index 5b72cfac40..070d105805 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java @@ -37,7 +37,6 @@ import org.apache.qpid.server.subscription.SubscriptionFactoryImpl; import org.apache.qpid.server.protocol.InternalTestProtocolSession; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.store.TestableMemoryMessageStore; -import org.apache.mina.common.ByteBuffer; import javax.management.JMException; @@ -275,18 +274,14 @@ public class AMQQueueMBeanTest extends InternalBrokerBaseCase msg.addContentBodyFrame(new ContentChunk() { - ByteBuffer _data = ByteBuffer.allocate((int)MESSAGE_SIZE); - - { - _data.limit((int)MESSAGE_SIZE); - } + byte[] _data = new byte[((int)MESSAGE_SIZE)]; public int getSize() { return (int) MESSAGE_SIZE; } - public ByteBuffer getData() + public byte[] getData() { return _data; } @@ -402,8 +397,8 @@ public class AMQQueueMBeanTest extends InternalBrokerBaseCase ContentHeaderBody contentHeaderBody = new ContentHeaderBody(); contentHeaderBody.bodySize = MESSAGE_SIZE; // in bytes - contentHeaderBody.properties = new BasicContentHeaderProperties(); - ((BasicContentHeaderProperties) contentHeaderBody.properties).setDeliveryMode((byte) (persistent ? 2 : 1)); + contentHeaderBody.setProperties(new BasicContentHeaderProperties()); + ((BasicContentHeaderProperties) contentHeaderBody.getProperties()).setDeliveryMode((byte) (persistent ? 2 : 1)); IncomingMessage msg = new IncomingMessage(publish); msg.setContentHeaderBody(contentHeaderBody); return msg; @@ -441,8 +436,7 @@ public class AMQQueueMBeanTest extends InternalBrokerBaseCase getSession().getMethodRegistry() .getProtocolVersionMethodConverter() .convertToContentChunk( - new ContentBody(ByteBuffer.allocate((int) MESSAGE_SIZE), - MESSAGE_SIZE))); + new ContentBody(new byte[(int) MESSAGE_SIZE]))); AMQMessage m = new AMQMessage(currentMessage.getStoredMessage()); for(BaseQueue q : currentMessage.getDestinationQueues()) diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java index 04608275a3..0f5374b3e5 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java @@ -126,7 +126,7 @@ public class AckTest extends InternalBrokerBaseCase //IncomingMessage msg2 = null; BasicContentHeaderProperties b = new BasicContentHeaderProperties(); ContentHeaderBody cb = new ContentHeaderBody(); - cb.properties = b; + cb.setProperties(b); if (persistent) { diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java b/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java index 888a16053c..4c31092983 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java @@ -29,7 +29,7 @@ import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.management.ManagedObject; import org.apache.qpid.server.message.ServerMessage; -import org.apache.qpid.server.security.PrincipalHolder; +import org.apache.qpid.server.security.AuthorizationHolder; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.binding.Binding; @@ -48,7 +48,7 @@ public class MockAMQQueue implements AMQQueue private AMQShortString _name; private VirtualHost _virtualhost; - private PrincipalHolder _principalHolder; + private AuthorizationHolder _authorizationHolder; private AMQSessionModel _exclusiveOwner; private AMQShortString _owner; @@ -536,14 +536,14 @@ public class MockAMQQueue implements AMQQueue return null; //To change body of implemented methods use File | Settings | File Templates. } - public PrincipalHolder getPrincipalHolder() + public AuthorizationHolder getAuthorizationHolder() { - return _principalHolder; + return _authorizationHolder; } - public void setPrincipalHolder(PrincipalHolder principalHolder) + public void setAuthorizationHolder(final AuthorizationHolder authorizationHolder) { - _principalHolder = principalHolder; + _authorizationHolder = authorizationHolder; } public AMQSessionModel getExclusiveOwningSession() diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java b/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java index 5bdbe2c68e..ab8850c18c 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java @@ -139,7 +139,7 @@ public class MockQueueEntry implements QueueEntry } - public boolean isRejectedBy(Subscription subscription) + public boolean isRejectedBy(long subscriptionId) { return false; @@ -153,13 +153,6 @@ public class MockQueueEntry implements QueueEntry } - public void reject(Subscription subscription) - { - - - } - - public void release() { @@ -231,4 +224,14 @@ public class MockQueueEntry implements QueueEntry _message = msg; } + public boolean isDequeued() + { + return false; + } + + public boolean isDispensed() + { + return false; + } + } diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTest.java new file mode 100644 index 0000000000..d8afd8d829 --- /dev/null +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTest.java @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server.queue; + +import java.lang.reflect.Field; + +import junit.framework.TestCase; + +import org.apache.qpid.AMQException; +import org.apache.qpid.server.message.AMQMessage; +import org.apache.qpid.server.queue.QueueEntry.EntryState; +import org.apache.qpid.server.subscription.MockSubscription; +import org.apache.qpid.server.subscription.Subscription; + +/** + * Tests for {@link QueueEntryImpl} + * + */ +public class QueueEntryImplTest extends TestCase +{ + // tested entry + private QueueEntryImpl _queueEntry; + + public void setUp() throws Exception + { + AMQMessage message = new MockAMQMessage(1); + SimpleQueueEntryList queueEntryList = new SimpleQueueEntryList(new MockAMQQueue("test")); + _queueEntry = new QueueEntryImpl(queueEntryList, message, 1); + } + + public void testAquire() + { + assertTrue("Queue entry should be in AVAILABLE state before invoking of acquire method", + _queueEntry.isAvailable()); + acquire(); + } + + public void testDequeue() + { + dequeue(); + } + + public void testDelete() + { + delete(); + } + + /** + * Tests release method for entry in acquired state. + * <p> + * Entry in state ACQUIRED should be released and its status should be + * changed to AVAILABLE. + */ + public void testReleaseAquired() + { + acquire(); + _queueEntry.release(); + assertTrue("Queue entry should be in AVAILABLE state after invoking of release method", + _queueEntry.isAvailable()); + } + + /** + * Tests release method for entry in dequeued state. + * <p> + * Invoking release on dequeued entry should not have any effect on its + * state. + */ + public void testReleaseDequeued() + { + dequeue(); + _queueEntry.release(); + EntryState state = getState(); + assertEquals("Invoking of release on entry in DEQUEUED state should not have any effect", + QueueEntry.DEQUEUED_STATE, state); + } + + /** + * Tests release method for entry in deleted state. + * <p> + * Invoking release on deleted entry should not have any effect on its + * state. + */ + public void testReleaseDeleted() + { + delete(); + _queueEntry.release(); + assertTrue("Invoking of release on entry in DELETED state should not have any effect", + _queueEntry.isDeleted()); + } + + /** + * Tests if entries in DEQUQUED or DELETED state are not returned by getNext method. + */ + public void testGetNext() + { + int numberOfEntries = 5; + QueueEntryImpl[] entries = new QueueEntryImpl[numberOfEntries]; + SimpleQueueEntryList queueEntryList = new SimpleQueueEntryList(new MockAMQQueue("test")); + + // create test entries + for(int i = 0; i < numberOfEntries ; i++) + { + AMQMessage message = null;; + try + { + message = new MockAMQMessage(i); + } + catch (AMQException e) + { + fail("Failure to create a mock message:" + e.getMessage()); + } + QueueEntryImpl entry = (QueueEntryImpl)queueEntryList.add(message); + entries[i] = entry; + } + + // test getNext for not acquired entries + for(int i = 0; i < numberOfEntries ; i++) + { + QueueEntryImpl queueEntry = entries[i]; + QueueEntryImpl next = queueEntry.getNext(); + if (i < numberOfEntries - 1) + { + assertEquals("Unexpected entry from QueueEntryImpl#getNext()", entries[i + 1], next); + } + else + { + assertNull("The next entry after the last should be null", next); + } + } + + // delete second + entries[1].acquire(); + entries[1].delete(); + + // dequeue third + entries[2].acquire(); + entries[2].dequeue(); + + QueueEntryImpl next = entries[0].getNext(); + assertEquals("expected forth entry",entries[3], next); + next = next.getNext(); + assertEquals("expected fifth entry", entries[4], next); + next = next.getNext(); + assertNull("The next entry after the last should be null", next); + } + /** + * A helper method to put tested object into deleted state and assert the state + */ + private void delete() + { + _queueEntry.delete(); + assertTrue("Queue entry should be in DELETED state after invoking of delete method", + _queueEntry.isDeleted()); + } + + /** + * A helper method to put tested entry into dequeue state and assert the sate + */ + private void dequeue() + { + acquire(); + _queueEntry.dequeue(); + EntryState state = getState(); + assertEquals("Queue entry should be in DEQUEUED state after invoking of dequeue method", + QueueEntry.DEQUEUED_STATE, state); + } + + /** + * A helper method to put tested entry into acquired state and assert the sate + */ + private void acquire() + { + _queueEntry.acquire(new MockSubscription()); + assertTrue("Queue entry should be in ACQUIRED state after invoking of acquire method", + _queueEntry.isAcquired()); + } + + /** + * A helper method to get entry state + * + * @return entry state + */ + private EntryState getState() + { + EntryState state = null; + try + { + Field f = QueueEntryImpl.class.getDeclaredField("_state"); + f.setAccessible(true); + state = (EntryState) f.get(_queueEntry); + } + catch (Exception e) + { + fail("Failure to get a state field: " + e.getMessage()); + } + return state; + } + + /** + * Tests rejecting a queue entry records the Subscription ID + * for later verification by isRejectedBy(subscriptionId). + */ + public void testRejectAndRejectedBy() + { + Subscription sub = new MockSubscription(); + long subId = sub.getSubscriptionID(); + + assertFalse("Queue entry should not yet have been rejected by the subscription", _queueEntry.isRejectedBy(subId)); + assertFalse("Queue entry should not yet have been acquired by a subscription", _queueEntry.isAcquired()); + + //acquire, reject, and release the message using the subscription + assertTrue("Queue entry should have been able to be acquired", _queueEntry.acquire(sub)); + _queueEntry.reject(); + _queueEntry.release(); + + //verify the rejection is recorded + assertTrue("Queue entry should have been rejected by the subscription", _queueEntry.isRejectedBy(subId)); + + //repeat rejection using a second subscription + Subscription sub2 = new MockSubscription(); + long sub2Id = sub2.getSubscriptionID(); + + assertFalse("Queue entry should not yet have been rejected by the subscription", _queueEntry.isRejectedBy(sub2Id)); + assertTrue("Queue entry should have been able to be acquired", _queueEntry.acquire(sub2)); + _queueEntry.reject(); + + //verify it still records being rejected by both subscriptions + assertTrue("Queue entry should have been rejected by the subscription", _queueEntry.isRejectedBy(subId)); + assertTrue("Queue entry should have been rejected by the subscription", _queueEntry.isRejectedBy(sub2Id)); + } +} diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java index 67d093d00a..f4cdbbe02c 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java @@ -36,13 +36,16 @@ import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.exchange.DirectExchange; import org.apache.qpid.server.message.AMQMessage; import org.apache.qpid.server.message.MessageMetaData; +import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.queue.BaseQueue.PostEnqueueAction; +import org.apache.qpid.server.queue.SimpleAMQQueue.QueueEntryFilter; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.store.TestableMemoryMessageStore; import org.apache.qpid.server.subscription.MockSubscription; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.txn.AutoCommitTransaction; +import org.apache.qpid.server.txn.LocalTransaction; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.InternalBrokerBaseCase; import org.apache.qpid.server.virtualhost.VirtualHost; @@ -51,6 +54,8 @@ import org.apache.qpid.server.virtualhost.VirtualHostImpl; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; public class SimpleAMQQueueTest extends InternalBrokerBaseCase { @@ -102,7 +107,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase ApplicationRegistry applicationRegistry = (ApplicationRegistry)ApplicationRegistry.getInstance(); PropertiesConfiguration env = new PropertiesConfiguration(); - _virtualHost = new VirtualHostImpl(new VirtualHostConfiguration(getClass().getName(), env), _store); + _virtualHost = new VirtualHostImpl(ApplicationRegistry.getInstance(), new VirtualHostConfiguration(getClass().getName(), env), _store); applicationRegistry.getVirtualHostRegistry().registerVirtualHost(_virtualHost); _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(_qname, false, _owner, false, false, _virtualHost, _arguments); @@ -227,10 +232,10 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase } /** - * Tests that a re-queued message is resent to the subscriber. Verifies also that the + * Tests that a released queue entry is resent to the subscriber. Verifies also that the * QueueContext._releasedEntry is reset to null after the entry has been reset. */ - public void testRequeuedMessageIsResentToSubscriber() throws Exception + public void testReleasedMessageIsResentToSubscriber() throws Exception { _queue.registerSubscription(_subscription, false); @@ -253,19 +258,18 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase _queue.enqueue(messageB, postEnqueueAction); _queue.enqueue(messageC, postEnqueueAction); - Thread.sleep(150); // Work done by SubFlushRunner Thread + Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads assertEquals("Unexpected total number of messages sent to subscription", 3, _subscription.getMessages().size()); assertFalse("Redelivery flag should not be set", queueEntries.get(0).isRedelivered()); assertFalse("Redelivery flag should not be set", queueEntries.get(1).isRedelivered()); assertFalse("Redelivery flag should not be set", queueEntries.get(2).isRedelivered()); - /* Now requeue the first message only */ + /* Now release the first message only, causing it to be requeued */ queueEntries.get(0).release(); - _queue.requeue(queueEntries.get(0)); - Thread.sleep(150); // Work done by SubFlushRunner Thread + Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads assertEquals("Unexpected total number of messages sent to subscription", 4, _subscription.getMessages().size()); assertTrue("Redelivery flag should now be set", queueEntries.get(0).isRedelivered()); @@ -275,11 +279,11 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase } /** - * Tests that a re-queued message that becomes expired is not resent to the subscriber. + * Tests that a released message that becomes expired is not resent to the subscriber. * This tests ensures that SimpleAMQQueueEntry.getNextAvailableEntry avoids expired entries. * Verifies also that the QueueContext._releasedEntry is reset to null after the entry has been reset. */ - public void testRequeuedMessageThatBecomesExpiredIsNotRedelivered() throws Exception + public void testReleaseMessageThatBecomesExpiredIsNotRedelivered() throws Exception { _queue.registerSubscription(_subscription, false); @@ -301,17 +305,16 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase _queue.enqueue(messageA, postEnqueueAction); int subFlushWaitTime = 150; - Thread.sleep(subFlushWaitTime); // Work done by SubFlushRunner Thread + Thread.sleep(subFlushWaitTime); // Work done by SubFlushRunner/QueueRunner Threads assertEquals("Unexpected total number of messages sent to subscription", 1, _subscription.getMessages().size()); assertFalse("Redelivery flag should not be set", queueEntries.get(0).isRedelivered()); - /* Wait a little more to be sure that message will have expired, then requeue it */ + /* Wait a little more to be sure that message will have expired, then release the first message only, causing it to be requeued */ Thread.sleep(messageExpirationOffset - subFlushWaitTime + 10); queueEntries.get(0).release(); - _queue.requeue(queueEntries.get(0)); - Thread.sleep(subFlushWaitTime); // Work done by SubFlushRunner Thread + Thread.sleep(subFlushWaitTime); // Work done by SubFlushRunner/QueueRunner Threads assertTrue("Expecting the queue entry to be now expired", queueEntries.get(0).expired()); assertEquals("Total number of messages sent should not have changed", 1, _subscription.getMessages().size()); @@ -321,12 +324,12 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase } /** - * Tests that if a client requeues messages 'out of order' (the order + * Tests that if a client releases entries 'out of order' (the order * used by QueueEntryImpl.compareTo) that messages are still resent * successfully. Specifically this test ensures the {@see SimpleAMQQueue#requeue()} * can correctly move the _releasedEntry to an earlier position in the QueueEntry list. */ - public void testMessagesRequeuedOutOfComparableOrderAreDelivered() throws Exception + public void testReleasedOutOfComparableOrderAreRedelivered() throws Exception { _queue.registerSubscription(_subscription, false); @@ -349,21 +352,19 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase _queue.enqueue(messageB, postEnqueueAction); _queue.enqueue(messageC, postEnqueueAction); - Thread.sleep(150); // Work done by SubFlushRunner Thread + Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads assertEquals("Unexpected total number of messages sent to subscription", 3, _subscription.getMessages().size()); assertFalse("Redelivery flag should not be set", queueEntries.get(0).isRedelivered()); assertFalse("Redelivery flag should not be set", queueEntries.get(1).isRedelivered()); assertFalse("Redelivery flag should not be set", queueEntries.get(2).isRedelivered()); - /* Now requeue the third and first message only */ + /* Now release the third and first message only, causing it to be requeued */ queueEntries.get(2).release(); queueEntries.get(0).release(); - _queue.requeue(queueEntries.get(2)); - _queue.requeue(queueEntries.get(0)); - Thread.sleep(150); // Work done by SubFlushRunner Thread + Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads assertEquals("Unexpected total number of messages sent to subscription", 5, _subscription.getMessages().size()); assertTrue("Redelivery flag should now be set", queueEntries.get(0).isRedelivered()); @@ -374,10 +375,10 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase /** - * Tests a requeue for a queue with multiple subscriptions. Verifies that a + * Tests that a release requeues an entry for a queue with multiple subscriptions. Verifies that a * requeue resends a message to a <i>single</i> subscriber. */ - public void testRequeueForQueueWithMultipleSubscriptions() throws Exception + public void testReleaseForQueueWithMultipleSubscriptions() throws Exception { MockSubscription subscription1 = new MockSubscription(); MockSubscription subscription2 = new MockSubscription(); @@ -402,66 +403,16 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase _queue.enqueue(messageA, postEnqueueAction); _queue.enqueue(messageB, postEnqueueAction); - Thread.sleep(150); // Work done by SubFlushRunner Thread + Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads - assertEquals("Unexpected total number of messages sent to subscription1 after enqueue", 1, subscription1.getMessages().size()); - assertEquals("Unexpected total number of messages sent to subscription2 after enqueue", 1, subscription2.getMessages().size()); + assertEquals("Unexpected total number of messages sent to both after enqueue", 2, subscription1.getMessages().size() + subscription2.getMessages().size()); - /* Now requeue a message (for any subscription) */ + /* Now release the first message only, causing it to be requeued */ + queueEntries.get(0).release(); - queueEntries.get(0).release(); - _queue.requeue((QueueEntryImpl)queueEntries.get(0)); - - Thread.sleep(150); // Work done by SubFlushRunner Thread + Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads - assertEquals("Unexpected total number of messages sent to all subscriptions after requeue", 3, subscription1.getMessages().size() + subscription2.getMessages().size()); - assertNull("releasedEntry should be cleared after requeue processed", ((QueueContext)subscription1.getQueueContext())._releasedEntry); - assertNull("releasedEntry should be cleared after requeue processed", ((QueueContext)subscription2.getQueueContext())._releasedEntry); - } - - /** - * Tests a requeue for a queue with multiple subscriptions. Verifies that a - * subscriber specific requeue resends the message to <i>that</i> subscriber. - */ - public void testSubscriptionSpecificRequeueForQueueWithMultipleSubscriptions() throws Exception - { - MockSubscription subscription1 = new MockSubscription(); - MockSubscription subscription2 = new MockSubscription(); - - _queue.registerSubscription(subscription1, false); - _queue.registerSubscription(subscription2, false); - - final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>(); - PostEnqueueAction postEnqueueAction = new PostEnqueueAction() - { - public void onEnqueue(QueueEntry entry) - { - queueEntries.add(entry); - } - }; - - AMQMessage messageA = createMessage(new Long(24)); - AMQMessage messageB = createMessage(new Long(25)); - - /* Enqueue two messages */ - - _queue.enqueue(messageA, postEnqueueAction); - _queue.enqueue(messageB, postEnqueueAction); - - Thread.sleep(150); // Work done by SubFlushRunner Thread - - assertEquals("Unexpected total number of messages sent to subscription1 after enqueue", 1, subscription1.getMessages().size()); - assertEquals("Unexpected total number of messages sent to subscription2 after enqueue", 1, subscription2.getMessages().size()); - - /* Now requeue a message (for first subscription) */ - - queueEntries.get(0).release(); - _queue.requeue((QueueEntryImpl)queueEntries.get(0), subscription1); - - Thread.sleep(150); // Work done by SubFlushRunner Thread - - assertEquals("Unexpected total number of messages sent to subscription1 after requeue", 2, subscription1.getMessages().size()); - assertEquals("Unexpected total number of messages sent to subscription2 after requeue", 1, subscription2.getMessages().size()); + assertEquals("Unexpected total number of messages sent to both subscriptions after release", 3, subscription1.getMessages().size() + subscription2.getMessages().size()); assertNull("releasedEntry should be cleared after requeue processed", ((QueueContext)subscription1.getQueueContext())._releasedEntry); assertNull("releasedEntry should be cleared after requeue processed", ((QueueContext)subscription2.getQueueContext())._releasedEntry); } @@ -660,8 +611,8 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase // Create IncomingMessage and nondurable queue final IncomingMessage msg = new IncomingMessage(info); ContentHeaderBody contentHeaderBody = new ContentHeaderBody(); - contentHeaderBody.properties = new BasicContentHeaderProperties(); - ((BasicContentHeaderProperties) contentHeaderBody.properties).setDeliveryMode((byte) 2); + contentHeaderBody.setProperties(new BasicContentHeaderProperties()); + ((BasicContentHeaderProperties) contentHeaderBody.getProperties()).setDeliveryMode((byte) 2); msg.setContentHeaderBody(contentHeaderBody); final ArrayList<BaseQueue> qs = new ArrayList<BaseQueue>(); @@ -707,6 +658,635 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase } + /** + * processQueue() is used when asynchronously delivering messages to + * subscriptions which could not be delivered immediately during the + * enqueue() operation. + * + * A defect within the method would mean that delivery of these messages may + * not occur should the Runner stop before all messages have been processed. + * Such a defect was discovered when Selectors were used such that one and + * only one subscription can/will accept any given messages, but multiple + * subscriptions are present, and one of the earlier subscriptions receives + * more messages than the others. + * + * This test is to validate that the processQueue() method is able to + * correctly deliver all of the messages present for asynchronous delivery + * to subscriptions in such a scenario. + */ + public void testProcessQueueWithUniqueSelectors() throws Exception + { + TestSimpleQueueEntryListFactory factory = new TestSimpleQueueEntryListFactory(); + SimpleAMQQueue testQueue = new SimpleAMQQueue("testQueue", false, "testOwner",false, + false, _virtualHost, factory, null) + { + @Override + public void deliverAsync(Subscription sub) + { + // do nothing, i.e prevent deliveries by the SubFlushRunner + // when registering the new subscriptions + } + }; + + // retrieve the QueueEntryList the queue creates and insert the test + // messages, thus avoiding straight-through delivery attempts during + //enqueue() process. + QueueEntryList list = factory.getQueueEntryList(); + assertNotNull("QueueEntryList should have been created", list); + + QueueEntry msg1 = list.add(createMessage(1L)); + QueueEntry msg2 = list.add(createMessage(2L)); + QueueEntry msg3 = list.add(createMessage(3L)); + QueueEntry msg4 = list.add(createMessage(4L)); + QueueEntry msg5 = list.add(createMessage(5L)); + + // Create lists of the entries each subscription should be interested + // in.Bias over 50% of the messages to the first subscription so that + // the later subscriptions reject them and report being done before + // the first subscription as the processQueue method proceeds. + List<QueueEntry> msgListSub1 = createEntriesList(msg1, msg2, msg3); + List<QueueEntry> msgListSub2 = createEntriesList(msg4); + List<QueueEntry> msgListSub3 = createEntriesList(msg5); + + MockSubscription sub1 = new MockSubscription(msgListSub1); + MockSubscription sub2 = new MockSubscription(msgListSub2); + MockSubscription sub3 = new MockSubscription(msgListSub3); + + // register the subscriptions + testQueue.registerSubscription(sub1, false); + testQueue.registerSubscription(sub2, false); + testQueue.registerSubscription(sub3, false); + + //check that no messages have been delivered to the + //subscriptions during registration + assertEquals("No messages should have been delivered yet", 0, sub1.getMessages().size()); + assertEquals("No messages should have been delivered yet", 0, sub2.getMessages().size()); + assertEquals("No messages should have been delivered yet", 0, sub3.getMessages().size()); + + // call processQueue to deliver the messages + testQueue.processQueue(new QueueRunner(testQueue, 1) + { + @Override + public void run() + { + // we dont actually want/need this runner to do any work + // because we we are already doing it! + } + }); + + // check expected messages delivered to correct consumers + verifyRecievedMessages(msgListSub1, sub1.getMessages()); + verifyRecievedMessages(msgListSub2, sub2.getMessages()); + verifyRecievedMessages(msgListSub3, sub3.getMessages()); + } + + /** + * Tests that dequeued message is not present in the list returned form + * {@link SimpleAMQQueue#getMessagesOnTheQueue()} + */ + public void testGetMessagesOnTheQueueWithDequeuedEntry() + { + int messageNumber = 4; + int dequeueMessageIndex = 1; + + // send test messages into a test queue + enqueueGivenNumberOfMessages(_queue, messageNumber); + + // dequeue message + dequeueMessage(_queue, dequeueMessageIndex); + + // get messages on the queue + List<QueueEntry> entries = _queue.getMessagesOnTheQueue(); + + // assert queue entries + assertEquals(messageNumber - 1, entries.size()); + int expectedId = 0; + for (int i = 0; i < messageNumber - 1; i++) + { + Long id = ((AMQMessage) entries.get(i).getMessage()).getMessageId(); + if (i == dequeueMessageIndex) + { + assertFalse("Message with id " + dequeueMessageIndex + + " was dequeued and should not be returned by method getMessagesOnTheQueue!", + new Long(expectedId).equals(id)); + expectedId++; + } + assertEquals("Expected message with id " + expectedId + " but got message with id " + id, + new Long(expectedId), id); + expectedId++; + } + } + + /** + * Tests that dequeued message is not present in the list returned form + * {@link SimpleAMQQueue#getMessagesOnTheQueue(QueueEntryFilter)} + */ + public void testGetMessagesOnTheQueueByQueueEntryFilterWithDequeuedEntry() + { + int messageNumber = 4; + int dequeueMessageIndex = 1; + + // send test messages into a test queue + enqueueGivenNumberOfMessages(_queue, messageNumber); + + // dequeue message + dequeueMessage(_queue, dequeueMessageIndex); + + // get messages on the queue with filter accepting all available messages + List<QueueEntry> entries = _queue.getMessagesOnTheQueue(new QueueEntryFilter() + { + public boolean accept(QueueEntry entry) + { + return true; + } + + public boolean filterComplete() + { + return false; + } + }); + + // assert entries on the queue + assertEquals(messageNumber - 1, entries.size()); + int expectedId = 0; + for (int i = 0; i < messageNumber - 1; i++) + { + Long id = ((AMQMessage) entries.get(i).getMessage()).getMessageId(); + if (i == dequeueMessageIndex) + { + assertFalse("Message with id " + dequeueMessageIndex + + " was dequeued and should not be returned by method getMessagesOnTheQueue!", + new Long(expectedId).equals(id)); + expectedId++; + } + assertEquals("Expected message with id " + expectedId + " but got message with id " + id, + new Long(expectedId), id); + expectedId++; + } + } + + /** + * Tests that dequeued message is not copied as part of invocation of + * {@link SimpleAMQQueue#copyMessagesToAnotherQueue(long, long, String, StoreContext)} + */ + public void testCopyMessagesWithDequeuedEntry() + { + int messageNumber = 4; + int dequeueMessageIndex = 1; + String anotherQueueName = "testQueue2"; + + // put test messages into a test queue + enqueueGivenNumberOfMessages(_queue, messageNumber); + + // dequeue message + dequeueMessage(_queue, dequeueMessageIndex); + + // create another queue + SimpleAMQQueue queue = createQueue(anotherQueueName); + + // create transaction + ServerTransaction txn = new LocalTransaction(_queue.getVirtualHost().getTransactionLog()); + + // copy messages into another queue + _queue.copyMessagesToAnotherQueue(0, messageNumber, anotherQueueName, txn); + + // commit transaction + txn.commit(); + + // get messages on another queue + List<QueueEntry> entries = queue.getMessagesOnTheQueue(); + + // assert another queue entries + assertEquals(messageNumber - 1, entries.size()); + int expectedId = 0; + for (int i = 0; i < messageNumber - 1; i++) + { + Long id = ((AMQMessage)entries.get(i).getMessage()).getMessageId(); + if (i == dequeueMessageIndex) + { + assertFalse("Message with id " + dequeueMessageIndex + + " was dequeued and should not been copied into another queue!", + new Long(expectedId).equals(id)); + expectedId++; + } + assertEquals("Expected message with id " + expectedId + " but got message with id " + id, + new Long(expectedId), id); + expectedId++; + } + } + + /** + * Tests that dequeued message is not moved as part of invocation of + * {@link SimpleAMQQueue#moveMessagesToAnotherQueue(long, long, String, StoreContext)} + */ + public void testMovedMessagesWithDequeuedEntry() + { + int messageNumber = 4; + int dequeueMessageIndex = 1; + String anotherQueueName = "testQueue2"; + + // put messages into a test queue + enqueueGivenNumberOfMessages(_queue, messageNumber); + + // dequeue message + dequeueMessage(_queue, dequeueMessageIndex); + + // create another queue + SimpleAMQQueue queue = createQueue(anotherQueueName); + + // create transaction + ServerTransaction txn = new LocalTransaction(_queue.getVirtualHost().getTransactionLog()); + + // move messages into another queue + _queue.moveMessagesToAnotherQueue(0, messageNumber, anotherQueueName, txn); + + // commit transaction + txn.commit(); + + // get messages on another queue + List<QueueEntry> entries = queue.getMessagesOnTheQueue(); + + // assert another queue entries + assertEquals(messageNumber - 1, entries.size()); + int expectedId = 0; + for (int i = 0; i < messageNumber - 1; i++) + { + Long id = ((AMQMessage)entries.get(i).getMessage()).getMessageId(); + if (i == dequeueMessageIndex) + { + assertFalse("Message with id " + dequeueMessageIndex + + " was dequeued and should not been copied into another queue!", + new Long(expectedId).equals(id)); + expectedId++; + } + assertEquals("Expected message with id " + expectedId + " but got message with id " + id, + new Long(expectedId), id); + expectedId++; + } + } + + /** + * Tests that messages in given range including dequeued one are deleted + * from the queue on invocation of + * {@link SimpleAMQQueue#removeMessagesFromQueue(long, long, StoreContext)} + */ + public void testRemoveMessagesFromQueueWithDequeuedEntry() + { + int messageNumber = 4; + int dequeueMessageIndex = 1; + + // put messages into a test queue + enqueueGivenNumberOfMessages(_queue, messageNumber); + + // dequeue message + dequeueMessage(_queue, dequeueMessageIndex); + + // remove messages + _queue.removeMessagesFromQueue(0, messageNumber); + + // get queue entries + List<QueueEntry> entries = _queue.getMessagesOnTheQueue(); + + // assert queue entries + assertNotNull("Null is returned from getMessagesOnTheQueue", entries); + assertEquals("Queue should be empty", 0, entries.size()); + } + + /** + * Tests that dequeued message on the top is not accounted and next message + * is deleted from the queue on invocation of + * {@link SimpleAMQQueue#deleteMessageFromTop(StoreContext)} + */ + public void testDeleteMessageFromTopWithDequeuedEntryOnTop() + { + int messageNumber = 4; + int dequeueMessageIndex = 0; + + // put messages into a test queue + enqueueGivenNumberOfMessages(_queue, messageNumber); + + // dequeue message on top + dequeueMessage(_queue, dequeueMessageIndex); + + //delete message from top + _queue.deleteMessageFromTop(); + + //get queue netries + List<QueueEntry> entries = _queue.getMessagesOnTheQueue(); + + // assert queue entries + assertNotNull("Null is returned from getMessagesOnTheQueue", entries); + assertEquals("Expected " + (messageNumber - 2) + " number of messages but recieved " + entries.size(), + messageNumber - 2, entries.size()); + assertEquals("Expected first entry with id 2", new Long(2), + ((AMQMessage) entries.get(0).getMessage()).getMessageId()); + } + + /** + * Tests that all messages including dequeued one are deleted from the queue + * on invocation of {@link SimpleAMQQueue#clearQueue(StoreContext)} + */ + public void testClearQueueWithDequeuedEntry() + { + int messageNumber = 4; + int dequeueMessageIndex = 1; + + // put messages into a test queue + enqueueGivenNumberOfMessages(_queue, messageNumber); + + // dequeue message on a test queue + dequeueMessage(_queue, dequeueMessageIndex); + + // clean queue + try + { + _queue.clearQueue(); + } + catch (AMQException e) + { + fail("Failure to clear queue:" + e.getMessage()); + } + + // get queue entries + List<QueueEntry> entries = _queue.getMessagesOnTheQueue(); + + // assert queue entries + assertNotNull(entries); + assertEquals(0, entries.size()); + } + + /** + * Tests whether dequeued entry is sent to subscriber in result of + * invocation of {@link SimpleAMQQueue#processQueue(QueueRunner)} + */ + public void testProcessQueueWithDequeuedEntry() + { + // total number of messages to send + int messageNumber = 4; + int dequeueMessageIndex = 1; + + // create queue with overridden method deliverAsync + SimpleAMQQueue testQueue = new SimpleAMQQueue(new AMQShortString("test"), false, + new AMQShortString("testOwner"), false, false, _virtualHost, null) + { + @Override + public void deliverAsync(Subscription sub) + { + // do nothing + } + }; + + // put messages + List<QueueEntry> entries = enqueueGivenNumberOfMessages(testQueue, messageNumber); + + // dequeue message + dequeueMessage(testQueue, dequeueMessageIndex); + + // latch to wait for message receipt + final CountDownLatch latch = new CountDownLatch(messageNumber -1); + + // create a subscription + MockSubscription subscription = new MockSubscription() + { + /** + * Send a message and decrement latch + */ + public void send(QueueEntry msg) throws AMQException + { + super.send(msg); + latch.countDown(); + } + }; + + try + { + // subscribe + testQueue.registerSubscription(subscription, false); + + // process queue + testQueue.processQueue(new QueueRunner(testQueue, 1) + { + public void run() + { + // do nothing + } + }); + } + catch (AMQException e) + { + fail("Failure to process queue:" + e.getMessage()); + } + // wait up to 1 minute for message receipt + try + { + latch.await(1, TimeUnit.MINUTES); + } + catch (InterruptedException e1) + { + Thread.currentThread().interrupt(); + } + List<QueueEntry> expected = createEntriesList(entries.get(0), entries.get(2), entries.get(3)); + verifyRecievedMessages(expected, subscription.getMessages()); + } + + /** + * Tests that entry in dequeued state are not enqueued and not delivered to subscription + */ + public void testEqueueDequeuedEntry() + { + // create a queue where each even entry is considered a dequeued + SimpleAMQQueue queue = new SimpleAMQQueue(new AMQShortString("test"), false, new AMQShortString("testOwner"), + false, false, _virtualHost, new QueueEntryListFactory() + { + public QueueEntryList createQueueEntryList(AMQQueue queue) + { + /** + * Override SimpleQueueEntryList to create a dequeued + * entries for messages with even id + */ + return new SimpleQueueEntryList(queue) + { + /** + * Entries with even message id are considered + * dequeued! + */ + protected QueueEntryImpl createQueueEntry(final ServerMessage message) + { + return new QueueEntryImpl(this, message) + { + public boolean isDequeued() + { + return (((AMQMessage) message).getMessageId().longValue() % 2 == 0); + } + + public boolean isDispensed() + { + return (((AMQMessage) message).getMessageId().longValue() % 2 == 0); + } + + public boolean isAvailable() + { + return !(((AMQMessage) message).getMessageId().longValue() % 2 == 0); + } + }; + } + }; + } + }, null); + // create a subscription + MockSubscription subscription = new MockSubscription(); + + // register subscription + try + { + queue.registerSubscription(subscription, false); + } + catch (AMQException e) + { + fail("Failure to register subscription:" + e.getMessage()); + } + + // put test messages into a queue + putGivenNumberOfMessages(queue, 4); + + // assert received messages + List<QueueEntry> messages = subscription.getMessages(); + assertEquals("Only 2 messages should be returned", 2, messages.size()); + assertEquals("ID of first message should be 1", new Long(1), + ((AMQMessage) messages.get(0).getMessage()).getMessageId()); + assertEquals("ID of second message should be 3", new Long(3), + ((AMQMessage) messages.get(1).getMessage()).getMessageId()); + } + + /** + * A helper method to create a queue with given name + * + * @param name + * queue name + * @return queue + */ + private SimpleAMQQueue createQueue(String name) + { + SimpleAMQQueue queue = null; + try + { + AMQShortString queueName = new AMQShortString(name); + AMQShortString ownerName = new AMQShortString(name + "Owner"); + queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(queueName, false, ownerName, false, false, + _virtualHost, _arguments); + } + catch (AMQException e) + { + fail("Failure to create a queue:" + e.getMessage()); + } + assertNotNull("Queue was not created", queue); + return queue; + } + + /** + * A helper method to put given number of messages into queue + * <p> + * All messages are asserted that they are present on queue + * + * @param queue + * queue to put messages into + * @param messageNumber + * number of messages to put into queue + */ + private List<QueueEntry> enqueueGivenNumberOfMessages(AMQQueue queue, int messageNumber) + { + putGivenNumberOfMessages(queue, messageNumber); + + // make sure that all enqueued messages are on the queue + List<QueueEntry> entries = queue.getMessagesOnTheQueue(); + assertEquals(messageNumber, entries.size()); + for (int i = 0; i < messageNumber; i++) + { + assertEquals(new Long(i), ((AMQMessage)entries.get(i).getMessage()).getMessageId()); + } + return entries; + } + + /** + * A helper method to put given number of messages into queue + * <p> + * Queue is not checked if messages are added into queue + * + * @param queue + * queue to put messages into + * @param messageNumber + * number of messages to put into queue + * @param queue + * @param messageNumber + */ + private void putGivenNumberOfMessages(AMQQueue queue, int messageNumber) + { + for (int i = 0; i < messageNumber; i++) + { + // Create message + Long messageId = new Long(i); + AMQMessage message = null; + try + { + message = createMessage(messageId); + } + catch (AMQException e) + { + fail("Failure to create a test message:" + e.getMessage()); + } + // Put message on queue + try + { + queue.enqueue(message); + } + catch (AMQException e) + { + fail("Failure to put message on queue:" + e.getMessage()); + } + } + } + + /** + * A helper method to dequeue an entry on queue with given index + * + * @param queue + * queue to dequeue message on + * @param dequeueMessageIndex + * entry index to dequeue. + */ + private QueueEntry dequeueMessage(AMQQueue queue, int dequeueMessageIndex) + { + List<QueueEntry> entries = queue.getMessagesOnTheQueue(); + QueueEntry entry = entries.get(dequeueMessageIndex); + entry.acquire(); + entry.dequeue(); + assertTrue(entry.isDequeued()); + return entry; + } + + private List<QueueEntry> createEntriesList(QueueEntry... entries) + { + ArrayList<QueueEntry> entriesList = new ArrayList<QueueEntry>(); + for (QueueEntry entry : entries) + { + entriesList.add(entry); + } + return entriesList; + } + + private void verifyRecievedMessages(List<QueueEntry> expected, + List<QueueEntry> delivered) + { + assertEquals("Consumer did not receive the expected number of messages", + expected.size(), delivered.size()); + + for (QueueEntry msg : expected) + { + assertTrue("Consumer did not recieve msg: " + + msg.getMessage().getMessageNumber(), delivered.contains(msg)); + } + } + public class TestMessage extends AMQMessage { private final long _tag; @@ -747,4 +1327,20 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase AMQMessage messageA = new TestMessage(id, id, info); return messageA; } + + class TestSimpleQueueEntryListFactory implements QueueEntryListFactory + { + QueueEntryList _list; + + public QueueEntryList createQueueEntryList(AMQQueue queue) + { + _list = new SimpleQueueEntryList(queue); + return _list; + } + + public QueueEntryList getQueueEntryList() + { + return _list; + } + } } diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryListTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryListTest.java index 320a75045a..7136f07ca5 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryListTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryListTest.java @@ -23,6 +23,7 @@ package org.apache.qpid.server.queue; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import org.apache.qpid.AMQException; import org.apache.qpid.server.message.AMQMessage; import junit.framework.TestCase; @@ -155,5 +156,55 @@ public class SimpleQueueEntryListTest extends TestCase assertEquals("Count should have been equal",count,remainingMessages.size()); } - + + public void testDequedMessagedNotPresentInIterator() + { + int numberOfMessages = 10; + SimpleQueueEntryList entryList = new SimpleQueueEntryList(new MockAMQQueue("test")); + QueueEntry[] entries = new QueueEntry[numberOfMessages]; + + for(int i = 0; i < numberOfMessages ; i++) + { + AMQMessage message = null;; + try + { + message = new MockAMQMessage(i); + } + catch (AMQException e) + { + fail("Failure to create a mock message:" + e.getMessage()); + } + QueueEntry entry = entryList.add(message); + assertNotNull("QE should not be null", entry); + entries[i]= entry; + } + + // dequeue all even messages + for (QueueEntry queueEntry : entries) + { + long i = ((AMQMessage)queueEntry.getMessage()).getMessageId().longValue(); + if (i%2 == 0) + { + queueEntry.acquire(); + queueEntry.dequeue(); + } + } + + // iterate and check that dequeued messages are not returned by iterator + QueueEntryIterator it = entryList.iterator(); + int counter = 0; + int i = 1; + while (it.advance()) + { + QueueEntry entry = it.getNode(); + Long id = ((AMQMessage)entry.getMessage()).getMessageId(); + assertEquals("Expected message with id " + i + " but got message with id " + + id, new Long(i), id); + counter++; + i += 2; + } + int expectedNumber = numberOfMessages / 2; + assertEquals("Expected " + expectedNumber + " number of entries in iterator but got " + counter, + expectedNumber, counter); + } } diff --git a/java/broker/src/test/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManagerTest.java b/java/broker/src/test/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManagerTest.java new file mode 100644 index 0000000000..b10442d7db --- /dev/null +++ b/java/broker/src/test/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManagerTest.java @@ -0,0 +1,358 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.security.auth.manager; + +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileWriter; +import java.security.Provider; +import java.security.Security; + +import javax.security.auth.Subject; +import javax.security.sasl.SaslException; +import javax.security.sasl.SaslServer; + +import org.apache.commons.configuration.CompositeConfiguration; +import org.apache.commons.configuration.ConfigurationException; +import org.apache.commons.configuration.XMLConfiguration; +import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin; +import org.apache.qpid.server.security.auth.AuthenticationResult; +import org.apache.qpid.server.security.auth.AuthenticationResult.AuthenticationStatus; +import org.apache.qpid.server.security.auth.database.PlainPasswordFilePrincipalDatabase; +import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal; +import org.apache.qpid.server.util.InternalBrokerBaseCase; + +/** + * + * Tests the public methods of PrincipalDatabaseAuthenticationManager. + * + */ +public class PrincipalDatabaseAuthenticationManagerTest extends InternalBrokerBaseCase +{ + private AuthenticationManager _manager = null; // Class under test + private String TEST_USERNAME = "guest"; + private String TEST_PASSWORD = "guest"; + + /** + * @see org.apache.qpid.server.util.InternalBrokerBaseCase#tearDown() + */ + @Override + public void tearDown() throws Exception + { + super.tearDown(); + if (_manager != null) + { + _manager.close(); + } + } + + /** + * @see org.apache.qpid.server.util.InternalBrokerBaseCase#setUp() + */ + @Override + public void setUp() throws Exception + { + super.setUp(); + + final String passwdFilename = createPasswordFile().getCanonicalPath(); + final ConfigurationPlugin config = getConfig(PlainPasswordFilePrincipalDatabase.class.getName(), + "passwordFile", passwdFilename); + + _manager = PrincipalDatabaseAuthenticationManager.FACTORY.newInstance(config); + } + + /** + * Tests where the case where the config specifies a PD implementation + * that is not found. + */ + public void testPrincipalDatabaseImplementationNotFound() throws Exception + { + try + { + _manager = PrincipalDatabaseAuthenticationManager.FACTORY.newInstance(getConfig("not.Found", null, null)); + fail("Exception not thrown"); + } + catch (ConfigurationException ce) + { + // PASS + } + } + + /** + * Tests where the case where the config specifies a PD implementation + * of the wrong type. + */ + public void testPrincipalDatabaseImplementationWrongType() throws Exception + { + try + { + _manager = PrincipalDatabaseAuthenticationManager.FACTORY.newInstance(getConfig(String.class.getName(), null, null)); // Not a PrincipalDatabase implementation + fail("Exception not thrown"); + } + catch (ConfigurationException ce) + { + // PASS + } + } + + /** + * Tests the case where a setter with the desired name cannot be found. + */ + public void testPrincipalDatabaseSetterNotFound() throws Exception + { + try + { + _manager = PrincipalDatabaseAuthenticationManager.FACTORY.newInstance(getConfig(PlainPasswordFilePrincipalDatabase.class.getName(), "noMethod", "test")); + fail("Exception not thrown"); + } + catch (ConfigurationException ce) + { + // PASS + } + } + + /** + * QPID-1347. Make sure the exception message and stack trace is reasonable for an absent password file. + */ + public void testPrincipalDatabaseThrowsSetterFileNotFound() throws Exception + { + try + { + _manager = PrincipalDatabaseAuthenticationManager.FACTORY.newInstance(getConfig(PlainPasswordFilePrincipalDatabase.class.getName(), "passwordFile", "/not/found")); + fail("Exception not thrown"); + } + catch (ConfigurationException ce) + { + // PASS + assertNotNull("Expected an underlying cause", ce.getCause()); + assertEquals(FileNotFoundException.class, ce.getCause().getClass()); + } + } + + /** + * Tests that the PDAM registers SASL mechanisms correctly with the runtime. + */ + public void testRegisteredMechanisms() throws Exception + { + assertNotNull(_manager.getMechanisms()); + // relies on those mechanisms attached to PropertiesPrincipalDatabaseManager + assertEquals("AMQPLAIN PLAIN CRAM-MD5", _manager.getMechanisms()); + + Provider qpidProvider = Security.getProvider(PrincipalDatabaseAuthenticationManager.PROVIDER_NAME); + assertNotNull(qpidProvider); + } + + /** + * Tests that the SASL factory method createSaslServer correctly + * returns a non-null implementation. + */ + public void testSaslMechanismCreation() throws Exception + { + SaslServer server = _manager.createSaslServer("CRAM-MD5", "localhost"); + assertNotNull(server); + // Merely tests the creation of the mechanism. Mechanisms themselves are tested + // by their own tests. + } + + /** + * Tests that the authenticate method correctly interprets an + * authentication success. + * + */ + public void testSaslAuthenticationSuccess() throws Exception + { + SaslServer testServer = createTestSaslServer(true, false); + + AuthenticationResult result = _manager.authenticate(testServer, "12345".getBytes()); + final Subject subject = result.getSubject(); + assertTrue(subject.getPrincipals().contains(new UsernamePrincipal("guest"))); + assertEquals(AuthenticationStatus.SUCCESS, result.getStatus()); + } + + /** + * + * Tests that the authenticate method correctly interprets an + * authentication not complete. + * + */ + public void testSaslAuthenticationNotCompleted() throws Exception + { + SaslServer testServer = createTestSaslServer(false, false); + + AuthenticationResult result = _manager.authenticate(testServer, "12345".getBytes()); + assertNull(result.getSubject()); + assertEquals(AuthenticationStatus.CONTINUE, result.getStatus()); + } + + /** + * + * Tests that the authenticate method correctly interprets an + * authentication error. + * + */ + public void testSaslAuthenticationError() throws Exception + { + SaslServer testServer = createTestSaslServer(false, true); + + AuthenticationResult result = _manager.authenticate(testServer, "12345".getBytes()); + assertNull(result.getSubject()); + assertEquals(AuthenticationStatus.ERROR, result.getStatus()); + } + + /** + * Tests that the authenticate method correctly interprets an + * authentication success. + * + */ + public void testNonSaslAuthenticationSuccess() throws Exception + { + AuthenticationResult result = _manager.authenticate("guest", "guest"); + final Subject subject = result.getSubject(); + assertFalse("Subject should not be set read-only", subject.isReadOnly()); + assertTrue(subject.getPrincipals().contains(new UsernamePrincipal("guest"))); + assertEquals(AuthenticationStatus.SUCCESS, result.getStatus()); + } + + /** + * Tests that the authenticate method correctly interprets an + * authentication success. + * + */ + public void testNonSaslAuthenticationNotCompleted() throws Exception + { + AuthenticationResult result = _manager.authenticate("guest", "wrongpassword"); + assertNull(result.getSubject()); + assertEquals(AuthenticationStatus.CONTINUE, result.getStatus()); + } + + /** + * Tests the ability to de-register the provider. + */ + public void testClose() throws Exception + { + assertEquals("AMQPLAIN PLAIN CRAM-MD5", _manager.getMechanisms()); + assertNotNull(Security.getProvider(PrincipalDatabaseAuthenticationManager.PROVIDER_NAME)); + + _manager.close(); + + // Check provider has been removed. + assertNull(_manager.getMechanisms()); + assertNull(Security.getProvider(PrincipalDatabaseAuthenticationManager.PROVIDER_NAME)); + _manager = null; + } + + /** + * Test SASL implementation used to test the authenticate() method. + */ + private SaslServer createTestSaslServer(final boolean complete, final boolean throwSaslException) + { + return new SaslServer() + { + public String getMechanismName() + { + return null; + } + + public byte[] evaluateResponse(byte[] response) throws SaslException + { + if (throwSaslException) + { + throw new SaslException("Mocked exception"); + } + return null; + } + + public boolean isComplete() + { + return complete; + } + + public String getAuthorizationID() + { + return complete ? "guest" : null; + } + + public byte[] unwrap(byte[] incoming, int offset, int len) throws SaslException + { + return null; + } + + public byte[] wrap(byte[] outgoing, int offset, int len) throws SaslException + { + return null; + } + + public Object getNegotiatedProperty(String propName) + { + return null; + } + + public void dispose() throws SaslException + { + } + }; + } + + private ConfigurationPlugin getConfig(final String clazz, final String argName, final String argValue) throws Exception + { + final ConfigurationPlugin config = new PrincipalDatabaseAuthenticationManager.PrincipalDatabaseAuthenticationManagerConfiguration(); + + XMLConfiguration xmlconfig = new XMLConfiguration(); + xmlconfig.addProperty("pd-auth-manager.principal-database.class", clazz); + + if (argName != null) + { + xmlconfig.addProperty("pd-auth-manager.principal-database.attributes.attribute.name", argName); + xmlconfig.addProperty("pd-auth-manager.principal-database.attributes.attribute.value", argValue); + } + + // Create a CompositeConfiguration as this is what the broker uses + CompositeConfiguration composite = new CompositeConfiguration(); + composite.addConfiguration(xmlconfig); + config.setConfiguration("security", xmlconfig); + return config; + } + + private File createPasswordFile() throws Exception + { + BufferedWriter writer = null; + try + { + File testFile = File.createTempFile(this.getClass().getName(),"tmp"); + testFile.deleteOnExit(); + + writer = new BufferedWriter(new FileWriter(testFile)); + writer.write(TEST_USERNAME + ":" + TEST_PASSWORD); + writer.newLine(); + + return testFile; + + } + finally + { + if (writer != null) + { + writer.close(); + } + } + } +} diff --git a/java/broker/src/test/java/org/apache/qpid/server/security/auth/rmi/RMIPasswordAuthenticatorTest.java b/java/broker/src/test/java/org/apache/qpid/server/security/auth/rmi/RMIPasswordAuthenticatorTest.java index e8c24da68d..6dc7b19d3d 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/security/auth/rmi/RMIPasswordAuthenticatorTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/security/auth/rmi/RMIPasswordAuthenticatorTest.java @@ -20,188 +20,125 @@ */ package org.apache.qpid.server.security.auth.rmi; -import java.io.BufferedWriter; -import java.io.File; -import java.io.FileWriter; -import java.io.IOException; import java.util.Collections; import javax.management.remote.JMXPrincipal; import javax.security.auth.Subject; - -import org.apache.qpid.server.security.auth.database.Base64MD5PasswordFilePrincipalDatabase; -import org.apache.qpid.server.security.auth.database.PlainPasswordFilePrincipalDatabase; +import javax.security.sasl.SaslException; +import javax.security.sasl.SaslServer; import junit.framework.TestCase; +import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin; +import org.apache.qpid.server.security.auth.AuthenticationResult; +import org.apache.qpid.server.security.auth.AuthenticationResult.AuthenticationStatus; +import org.apache.qpid.server.security.auth.manager.AuthenticationManager; + +/** + * Tests the RMIPasswordAuthenticator and its collaboration with the AuthenticationManager. + * + */ public class RMIPasswordAuthenticatorTest extends TestCase { private final String USERNAME = "guest"; private final String PASSWORD = "guest"; - private final String B64_MD5HASHED_PASSWORD = "CE4DQ6BIb/BVMN9scFyLtA=="; private RMIPasswordAuthenticator _rmipa; - - private Base64MD5PasswordFilePrincipalDatabase _md5Pd; - private File _md5PwdFile; - - private PlainPasswordFilePrincipalDatabase _plainPd; - private File _plainPwdFile; - - private Subject testSubject; + private String[] _credentials; protected void setUp() throws Exception { _rmipa = new RMIPasswordAuthenticator(); - _md5Pd = new Base64MD5PasswordFilePrincipalDatabase(); - _md5PwdFile = createTempPasswordFile(this.getClass().getName()+"md5pwd", USERNAME, B64_MD5HASHED_PASSWORD); - _md5Pd.setPasswordFile(_md5PwdFile.getAbsolutePath()); - - _plainPd = new PlainPasswordFilePrincipalDatabase(); - _plainPwdFile = createTempPasswordFile(this.getClass().getName()+"plainpwd", USERNAME, PASSWORD); - _plainPd.setPasswordFile(_plainPwdFile.getAbsolutePath()); - - testSubject = new Subject(true, + _credentials = new String[] {USERNAME, PASSWORD}; + } + + /** + * Tests a successful authentication. Ensures that a populated read-only subject it returned. + */ + public void testAuthenticationSuccess() + { + final Subject expectedSubject = new Subject(true, Collections.singleton(new JMXPrincipal(USERNAME)), Collections.EMPTY_SET, Collections.EMPTY_SET); - } - - private File createTempPasswordFile(String filenamePrefix, String user, String password) - { - try - { - File testFile = File.createTempFile(filenamePrefix,"tmp"); - testFile.deleteOnExit(); - - BufferedWriter writer = new BufferedWriter(new FileWriter(testFile)); - writer.write(user + ":" + password); - writer.newLine(); + _rmipa.setAuthenticationManager(createTestAuthenticationManager(true, null)); - writer.flush(); - writer.close(); - return testFile; - } - catch (IOException e) - { - fail("Unable to create temporary test password file." + e.getMessage()); - } + Subject newSubject = _rmipa.authenticate(_credentials); + assertTrue("Subject must be readonly", newSubject.isReadOnly()); + assertTrue("Returned subject does not equal expected value", + newSubject.equals(expectedSubject)); - return null; } - - - //********** Test Methods *********// - - public void testAuthenticate() + /** + * Tests a unsuccessful authentication. + */ + public void testUsernameOrPasswordInvalid() { - String[] credentials; - Subject newSubject; - - // Test when no PD has been set - try - { - credentials = new String[]{USERNAME, PASSWORD}; - newSubject = _rmipa.authenticate(credentials); - fail("SecurityException expected due to lack of principal database"); - } - catch (SecurityException se) - { - assertEquals("Unexpected exception message", - RMIPasswordAuthenticator.UNABLE_TO_LOOKUP, se.getMessage()); - } - - //The PrincipalDatabase's are tested primarily by their own tests, but - //minimal tests are done here to exercise their usage in this area. + _rmipa.setAuthenticationManager(createTestAuthenticationManager(false, null)); - // Test correct passwords are verified with an MD5 PD - try - { - _rmipa.setPrincipalDatabase(_md5Pd); - credentials = new String[]{USERNAME, PASSWORD}; - newSubject = _rmipa.authenticate(credentials); - assertTrue("Returned subject does not equal expected value", - newSubject.equals(testSubject)); - } - catch (Exception e) - { - fail("Unexpected Exception:" + e.getMessage()); - } - - // Test incorrect passwords are not verified with an MD5 PD try { - credentials = new String[]{USERNAME, PASSWORD+"incorrect"}; - newSubject = _rmipa.authenticate(credentials); - fail("SecurityException expected due to incorrect password"); + _rmipa.authenticate(_credentials); + fail("Exception not thrown"); } catch (SecurityException se) { assertEquals("Unexpected exception message", RMIPasswordAuthenticator.INVALID_CREDENTIALS, se.getMessage()); - } - - // Test non-existent accounts are not verified with an MD5 PD - try - { - credentials = new String[]{USERNAME+"invalid", PASSWORD}; - newSubject = _rmipa.authenticate(credentials); - fail("SecurityException expected due to non-existant account"); - } - catch (SecurityException se) - { - assertEquals("Unexpected exception message", - RMIPasswordAuthenticator.INVALID_CREDENTIALS, se.getMessage()); - } - // Test correct passwords are verified with a Plain PD - try - { - _rmipa.setPrincipalDatabase(_plainPd); - credentials = new String[]{USERNAME, PASSWORD}; - newSubject = _rmipa.authenticate(credentials); - assertTrue("Returned subject does not equal expected value", - newSubject.equals(testSubject)); - } - catch (Exception e) - { - fail("Unexpected Exception"); } + } + + /** + * Tests case where authentication system itself fails. + */ + public void testAuthenticationFailure() + { + final Exception mockAuthException = new Exception("Mock Auth system failure"); + _rmipa.setAuthenticationManager(createTestAuthenticationManager(false, mockAuthException)); - // Test incorrect passwords are not verified with a Plain PD try { - credentials = new String[]{USERNAME, PASSWORD+"incorrect"}; - newSubject = _rmipa.authenticate(credentials); - fail("SecurityException expected due to incorrect password"); + _rmipa.authenticate(_credentials); + fail("Exception not thrown"); } catch (SecurityException se) { - assertEquals("Unexpected exception message", - RMIPasswordAuthenticator.INVALID_CREDENTIALS, se.getMessage()); + assertEquals("Initial cause not found", mockAuthException, se.getCause()); } - - // Test non-existent accounts are not verified with an Plain PD + } + + + /** + * Tests case where authentication manager is not set. + */ + public void testNullAuthenticationManager() + { try { - credentials = new String[]{USERNAME+"invalid", PASSWORD}; - newSubject = _rmipa.authenticate(credentials); - fail("SecurityException expected due to non existant account"); + _rmipa.authenticate(_credentials); + fail("SecurityException expected due to lack of authentication manager"); } catch (SecurityException se) { assertEquals("Unexpected exception message", - RMIPasswordAuthenticator.INVALID_CREDENTIALS, se.getMessage()); + RMIPasswordAuthenticator.UNABLE_TO_LOOKUP, se.getMessage()); } + } + /** + * Tests case where arguments are non-Strings.. + */ + public void testWithNonStringArrayArgument() + { // Test handling of non-string credential's + final Object[] objCredentials = new Object[]{USERNAME, PASSWORD}; try { - Object[] objCredentials = new Object[]{USERNAME, PASSWORD}; - newSubject = _rmipa.authenticate(objCredentials); + _rmipa.authenticate(objCredentials); fail("SecurityException expected due to non string[] credentials"); } catch (SecurityException se) @@ -209,12 +146,18 @@ public class RMIPasswordAuthenticatorTest extends TestCase assertEquals("Unexpected exception message", RMIPasswordAuthenticator.SHOULD_BE_STRING_ARRAY, se.getMessage()); } - - // Test handling of incorrect number of credential's + } + + /** + * Tests case where there are too many, too few or null arguments. + */ + public void testWithIllegalNumberOfArguments() + { + // Test handling of incorrect number of credentials try { - credentials = new String[]{USERNAME, PASSWORD, PASSWORD}; - newSubject = _rmipa.authenticate(credentials); + _credentials = new String[]{USERNAME, PASSWORD, PASSWORD}; + _rmipa.authenticate(_credentials); fail("SecurityException expected due to supplying wrong number of credentials"); } catch (SecurityException se) @@ -223,12 +166,12 @@ public class RMIPasswordAuthenticatorTest extends TestCase RMIPasswordAuthenticator.SHOULD_HAVE_2_ELEMENTS, se.getMessage()); } - // Test handling of null credential's + // Test handling of null credentials try { //send a null array - credentials = null; - newSubject = _rmipa.authenticate(credentials); + _credentials = null; + _rmipa.authenticate(_credentials); fail("SecurityException expected due to not supplying an array of credentials"); } catch (SecurityException se) @@ -240,8 +183,8 @@ public class RMIPasswordAuthenticatorTest extends TestCase try { //send a null password - credentials = new String[]{USERNAME, null}; - newSubject = _rmipa.authenticate(credentials); + _credentials = new String[]{USERNAME, null}; + _rmipa.authenticate(_credentials); fail("SecurityException expected due to sending a null password"); } catch (SecurityException se) @@ -253,8 +196,8 @@ public class RMIPasswordAuthenticatorTest extends TestCase try { //send a null username - credentials = new String[]{null, PASSWORD}; - newSubject = _rmipa.authenticate(credentials); + _credentials = new String[]{null, PASSWORD}; + _rmipa.authenticate(_credentials); fail("SecurityException expected due to sending a null username"); } catch (SecurityException se) @@ -264,4 +207,54 @@ public class RMIPasswordAuthenticatorTest extends TestCase } } + private AuthenticationManager createTestAuthenticationManager(final boolean successfulAuth, final Exception exception) + { + return new AuthenticationManager() + { + public void configure(ConfigurationPlugin config) + { + throw new UnsupportedOperationException(); + } + + public void initialise() + { + throw new UnsupportedOperationException(); + } + + public void close() + { + throw new UnsupportedOperationException(); + } + + public String getMechanisms() + { + throw new UnsupportedOperationException(); + } + + public SaslServer createSaslServer(String mechanism, String localFQDN) throws SaslException + { + throw new UnsupportedOperationException(); + } + + public AuthenticationResult authenticate(SaslServer server, byte[] response) + { + throw new UnsupportedOperationException(); + } + + public AuthenticationResult authenticate(String username, String password) + { + if (exception != null) { + return new AuthenticationResult(AuthenticationStatus.ERROR, exception); + } + else if (successfulAuth) + { + return new AuthenticationResult(new Subject()); + } + else + { + return new AuthenticationResult(AuthenticationStatus.CONTINUE); + } + } + }; + } } diff --git a/java/broker/src/test/java/org/apache/qpid/server/security/auth/sasl/CRAMMD5HexServerTest.java b/java/broker/src/test/java/org/apache/qpid/server/security/auth/sasl/CRAMMD5HexServerTest.java new file mode 100644 index 0000000000..86e4e23750 --- /dev/null +++ b/java/broker/src/test/java/org/apache/qpid/server/security/auth/sasl/CRAMMD5HexServerTest.java @@ -0,0 +1,228 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.server.security.auth.sasl; + +import java.io.File; +import java.io.IOException; +import java.security.MessageDigest; +import java.security.Principal; + +import javax.crypto.Mac; +import javax.crypto.spec.SecretKeySpec; +import javax.security.auth.login.AccountNotFoundException; +import javax.security.sasl.SaslException; +import javax.security.sasl.SaslServer; + +import junit.framework.TestCase; + +import org.apache.commons.codec.binary.Hex; +import org.apache.qpid.server.security.auth.database.Base64MD5PasswordFilePrincipalDatabase; +import org.apache.qpid.server.security.auth.sasl.crammd5.CRAMMD5HexInitialiser; +import org.apache.qpid.server.security.auth.sasl.crammd5.CRAMMD5HexSaslServer; +import org.apache.qpid.server.security.auth.sasl.crammd5.CRAMMD5HexServerFactory; + +/** + * Test for the CRAM-MD5-HEX SASL mechanism. + * + * This test case focuses on testing {@link CRAMMD5HexSaslServer} but also exercises + * collaborators {@link CRAMMD5HexInitialiser} and {@link Base64MD5PasswordFilePrincipalDatabase} + */ +public class CRAMMD5HexServerTest extends TestCase +{ + + private SaslServer _saslServer; // Class under test + private CRAMMD5HexServerFactory _saslFactory; + + @Override + protected void setUp() throws Exception + { + super.setUp(); + + CRAMMD5HexInitialiser _initializer = new CRAMMD5HexInitialiser(); + + //Use properties to create a PrincipalDatabase + Base64MD5PasswordFilePrincipalDatabase db = createTestPrincipalDatabase(); + assertEquals("Unexpected number of test users in the db", 2, db.getUsers().size()); + + _initializer.initialise(db); + + _saslFactory = new CRAMMD5HexServerFactory(); + + _saslServer = _saslFactory.createSaslServer(CRAMMD5HexSaslServer.MECHANISM, + "AMQP", + "localhost", + _initializer.getProperties(), + _initializer.getCallbackHandler()); + assertNotNull("Unable to create saslServer with mechanism type " + CRAMMD5HexSaslServer.MECHANISM, _saslServer); + + } + + public void testSuccessfulAuth() throws Exception + { + + final byte[] serverChallenge = _saslServer.evaluateResponse(new byte[0]); + + // Generate client response + final byte[] clientResponse = generateClientResponse("knownuser", "guest", serverChallenge); + + + byte[] nextServerChallenge = _saslServer.evaluateResponse(clientResponse); + assertTrue("Exchange must be flagged as complete after successful authentication", _saslServer.isComplete()); + assertNull("Next server challenge must be null after successful authentication", nextServerChallenge); + + } + + public void testKnownUserPresentsWrongPassword() throws Exception + { + byte[] serverChallenge = _saslServer.evaluateResponse(new byte[0]); + + + final byte[] clientResponse = generateClientResponse("knownuser", "wrong!", serverChallenge); + try + { + _saslServer.evaluateResponse(clientResponse); + fail("Exception not thrown"); + } + catch (SaslException se) + { + // PASS + } + assertFalse("Exchange must not be flagged as complete after unsuccessful authentication", _saslServer.isComplete()); + } + + public void testUnknownUser() throws Exception + { + final byte[] serverChallenge = _saslServer.evaluateResponse(new byte[0]); + + + final byte[] clientResponse = generateClientResponse("unknownuser", "guest", serverChallenge); + + try + { + _saslServer.evaluateResponse(clientResponse); + fail("Exception not thrown"); + } + catch (SaslException se) + { + assertExceptionHasUnderlyingAsCause(AccountNotFoundException.class, se); + // PASS + } + assertFalse("Exchange must not be flagged as complete after unsuccessful authentication", _saslServer.isComplete()); + } + + /** + * + * Demonstrates QPID-3158. A defect meant that users with some valid password were failing to + * authenticate when using the .NET 0-8 client (uses this SASL mechanism). + * It so happens that password "guest2" was one of the affected passwords. + * + * @throws Exception + */ + public void testSuccessfulAuthReproducingQpid3158() throws Exception + { + byte[] serverChallenge = _saslServer.evaluateResponse(new byte[0]); + + // Generate client response + byte[] resp = generateClientResponse("qpid3158user", "guest2", serverChallenge); + + byte[] nextServerChallenge = _saslServer.evaluateResponse(resp); + assertTrue("Exchange must be flagged as complete after successful authentication", _saslServer.isComplete()); + assertNull("Next server challenge must be null after successful authentication", nextServerChallenge); + } + + /** + * Since we don't have a CRAM-MD5-HEX implementation client implementation in Java, this method + * provides the implementation for first principals. + * + * @param userId user id + * @param clearTextPassword clear text password + * @param serverChallenge challenge from server + * + * @return challenge response + */ + private byte[] generateClientResponse(final String userId, final String clearTextPassword, final byte[] serverChallenge) throws Exception + { + byte[] digestedPasswordBytes = MessageDigest.getInstance("MD5").digest(clearTextPassword.getBytes()); + char[] hexEncodedDigestedPassword = Hex.encodeHex(digestedPasswordBytes); + byte[] hexEncodedDigestedPasswordBytes = new String(hexEncodedDigestedPassword).getBytes(); + + + Mac hmacMd5 = Mac.getInstance("HmacMD5"); + hmacMd5.init(new SecretKeySpec(hexEncodedDigestedPasswordBytes, "HmacMD5")); + final byte[] messageAuthenticationCode = hmacMd5.doFinal(serverChallenge); + + // Build client response + String responseAsString = userId + " " + new String(Hex.encodeHex(messageAuthenticationCode)); + byte[] resp = responseAsString.getBytes(); + return resp; + } + + /** + * Creates a test principal database. + * + * @return + * @throws IOException + */ + private Base64MD5PasswordFilePrincipalDatabase createTestPrincipalDatabase() throws IOException + { + Base64MD5PasswordFilePrincipalDatabase db = new Base64MD5PasswordFilePrincipalDatabase(); + File file = File.createTempFile("passwd", "db"); + file.deleteOnExit(); + db.setPasswordFile(file.getCanonicalPath()); + db.createPrincipal( createTestPrincipal("knownuser"), "guest".toCharArray()); + db.createPrincipal( createTestPrincipal("qpid3158user"), "guest2".toCharArray()); + return db; + } + + private Principal createTestPrincipal(final String name) + { + return new Principal() + { + public String getName() + { + return name; + } + }; + } + + private void assertExceptionHasUnderlyingAsCause(final Class<? extends Throwable> expectedUnderlying, Throwable e) + { + assertNotNull(e); + int infiniteLoopGuard = 0; // Guard against loops in the cause chain + boolean foundExpectedUnderlying = false; + while (e.getCause() != null && infiniteLoopGuard++ < 10) + { + if (expectedUnderlying.equals(e.getCause().getClass())) + { + foundExpectedUnderlying = true; + break; + } + e = e.getCause(); + } + + if (!foundExpectedUnderlying) + { + fail("Not found expected underlying exception " + expectedUnderlying + " as underlying cause of " + e.getClass()); + } + } + +} diff --git a/java/broker/src/test/java/org/apache/qpid/server/security/auth/sasl/GroupPrincipalTest.java b/java/broker/src/test/java/org/apache/qpid/server/security/auth/sasl/GroupPrincipalTest.java new file mode 100644 index 0000000000..076b7c9248 --- /dev/null +++ b/java/broker/src/test/java/org/apache/qpid/server/security/auth/sasl/GroupPrincipalTest.java @@ -0,0 +1,86 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.security.auth.sasl; + +import junit.framework.TestCase; + +public class GroupPrincipalTest extends TestCase +{ + public void testGetName() + { + final GroupPrincipal principal = new GroupPrincipal("group"); + assertEquals("group", principal.getName()); + } + + public void testAddRejected() + { + final GroupPrincipal principal = new GroupPrincipal("group"); + final UsernamePrincipal user = new UsernamePrincipal("name"); + + try + { + principal.addMember(user); + fail("Exception not thrown"); + } + catch (UnsupportedOperationException uso) + { + // PASS + } + } + + public void testEqualitySameName() + { + final String string = "string"; + final GroupPrincipal principal1 = new GroupPrincipal(string); + final GroupPrincipal principal2 = new GroupPrincipal(string); + assertTrue(principal1.equals(principal2)); + } + + public void testEqualityEqualName() + { + final GroupPrincipal principal1 = new GroupPrincipal(new String("string")); + final GroupPrincipal principal2 = new GroupPrincipal(new String("string")); + assertTrue(principal1.equals(principal2)); + } + + public void testInequalityDifferentGroupPrincipals() + { + GroupPrincipal principal1 = new GroupPrincipal("string1"); + GroupPrincipal principal2 = new GroupPrincipal("string2"); + assertFalse(principal1.equals(principal2)); + } + + public void testInequalityNonGroupPrincipal() + { + GroupPrincipal principal = new GroupPrincipal("string"); + assertFalse(principal.equals(new UsernamePrincipal("string"))); + } + + public void testInequalityNull() + { + GroupPrincipal principal = new GroupPrincipal("string"); + assertFalse(principal.equals(null)); + } + + + + +} diff --git a/java/broker/src/test/java/org/apache/qpid/util/MockChannel.java b/java/broker/src/test/java/org/apache/qpid/server/security/auth/sasl/TestPrincipalUtils.java index 9bd1e7c5e1..8b9b2df5a3 100644 --- a/java/broker/src/test/java/org/apache/qpid/util/MockChannel.java +++ b/java/broker/src/test/java/org/apache/qpid/server/security/auth/sasl/TestPrincipalUtils.java @@ -18,23 +18,32 @@ * under the License. * */ -package org.apache.qpid.util; +package org.apache.qpid.server.security.auth.sasl; -import org.apache.qpid.server.AMQChannel; -import org.apache.qpid.server.subscription.Subscription; -import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; +import java.security.Principal; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; -public class MockChannel extends AMQChannel +import javax.security.auth.Subject; + +public class TestPrincipalUtils { - public MockChannel(AMQProtocolSession session, int channelId, MessageStore messageStore) - throws AMQException + + /** + * Creates a test subject, with exactly one UsernamePrincipal and zero or more GroupPrincipals. + */ + public static Subject createTestSubject(final String username, final String... groups) { - super(session, channelId, messageStore); + final Set<Principal> principals = new HashSet<Principal>(1 + groups.length); + principals.add(new UsernamePrincipal(username)); + for (String group : groups) + { + principals.add(new GroupPrincipal(group)); + } + + final Subject subject = new Subject(true, principals, Collections.EMPTY_SET, Collections.EMPTY_SET); + return subject; } - - } diff --git a/java/broker/src/test/java/org/apache/qpid/server/security/auth/sasl/UsernamePrincipalTest.java b/java/broker/src/test/java/org/apache/qpid/server/security/auth/sasl/UsernamePrincipalTest.java new file mode 100644 index 0000000000..541f14d923 --- /dev/null +++ b/java/broker/src/test/java/org/apache/qpid/server/security/auth/sasl/UsernamePrincipalTest.java @@ -0,0 +1,122 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.security.auth.sasl; + +import java.security.Principal; +import javax.security.auth.Subject; +import junit.framework.TestCase; + +/** + * Tests the UsernamePrincipal. + * + */ +public class UsernamePrincipalTest extends TestCase +{ + public void testEqualitySameObject() + { + final UsernamePrincipal principal = new UsernamePrincipal("string"); + assertTrue(principal.equals(principal)); + } + + public void testEqualitySameName() + { + final String string = "string"; + final UsernamePrincipal principal1 = new UsernamePrincipal(string); + final UsernamePrincipal principal2 = new UsernamePrincipal(string); + assertTrue(principal1.equals(principal2)); + } + + public void testEqualityEqualName() + { + final UsernamePrincipal principal1 = new UsernamePrincipal(new String("string")); + final UsernamePrincipal principal2 = new UsernamePrincipal(new String("string")); + assertTrue(principal1.equals(principal2)); + } + + public void testInequalityDifferentUserPrincipals() + { + UsernamePrincipal principal1 = new UsernamePrincipal("string1"); + UsernamePrincipal principal2 = new UsernamePrincipal("string2"); + assertFalse(principal1.equals(principal2)); + } + + public void testInequalityNonUserPrincipal() + { + UsernamePrincipal principal = new UsernamePrincipal("string"); + assertFalse(principal.equals(new String("string"))); + } + + public void testInequalityNull() + { + UsernamePrincipal principal = new UsernamePrincipal("string"); + assertFalse(principal.equals(null)); + } + + public void testGetUsernamePrincipalFromSubject() + { + final UsernamePrincipal expected = new UsernamePrincipal("name"); + final Principal other = new Principal() + { + public String getName() + { + return "otherprincipal"; + } + }; + + final Subject subject = new Subject(); + subject.getPrincipals().add(expected); + subject.getPrincipals().add(other); + + final UsernamePrincipal actual = UsernamePrincipal.getUsernamePrincipalFromSubject(subject); + assertSame(expected, actual); + } + + public void testUsernamePrincipalNotInSubject() + { + try + { + UsernamePrincipal.getUsernamePrincipalFromSubject(new Subject()); + fail("Exception not thrown"); + } + catch (IllegalArgumentException iae) + { + // PASS + } + } + + public void testTooManyUsernamePrincipalInSubject() + { + final Subject subject = new Subject(); + subject.getPrincipals().add(new UsernamePrincipal("name1")); + subject.getPrincipals().add(new UsernamePrincipal("name2")); + try + { + + UsernamePrincipal.getUsernamePrincipalFromSubject(subject); + fail("Exception not thrown"); + } + catch (IllegalArgumentException iae) + { + // PASS + } + } + +} diff --git a/java/broker/src/test/java/org/apache/qpid/server/signal/SignalHandlerTaskTest.java b/java/broker/src/test/java/org/apache/qpid/server/signal/SignalHandlerTaskTest.java new file mode 100644 index 0000000000..886cb080aa --- /dev/null +++ b/java/broker/src/test/java/org/apache/qpid/server/signal/SignalHandlerTaskTest.java @@ -0,0 +1,118 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.signal; + +import java.lang.management.ManagementFactory; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.log4j.Logger; +import org.apache.qpid.test.utils.QpidTestCase; + +public class SignalHandlerTaskTest extends QpidTestCase +{ + private static final Logger LOGGER = Logger.getLogger(SignalHandlerTaskTest.class); + private static final String SUN_MISC_SIGNAL_CLASS = "sun.misc.Signal"; + private static final String SUN_MISC_SIGNAL_HANDLER_CLASS = "sun.misc.SignalHandler"; + + protected void setUp() throws Exception + { + super.setUp(); + } + + public void testSignalHandlerTask() throws Exception + { + final boolean expectedResult = classifyExpectedRegistrationResult(); + final int pid = getPID(); + final CountDownLatch latch = new CountDownLatch(1); + + SignalHandlerTask hupReparseTask = new SignalHandlerTask() + { + public void handle() + { + latch.countDown(); + LOGGER.info("Signal handled, latch decremented"); + } + }; + + assertEquals("Unexpected result trying to register Signal handler", + expectedResult, hupReparseTask.register("HUP")); + LOGGER.info("Signal handler was registered"); + + assertEquals("unexpected count for the latch", 1, latch.getCount()); + + if(expectedResult) + { + //registration succeeded as expected, so now + //send SIGHUP and verify the handler was run + String cmd = "/bin/kill -SIGHUP " + pid; + + LOGGER.info("Sending SIGHUP"); + Runtime.getRuntime().exec(cmd); + + assertTrue("HUP Signal was not handled in the allowed timeframe", + latch.await(5, TimeUnit.SECONDS)); + } + } + + public void testGetPlatformDescription() throws Exception + { + assertNotNull(SignalHandlerTask.getPlatformDescription()); + } + + private boolean classifyExpectedRegistrationResult() + { + String os = System.getProperty("os.name"); + if(String.valueOf(os).toLowerCase().contains("windows")) + { + //Windows does not support SIGHUP so registration will fail + LOGGER.info("Running on windows, so we expect SIGHUP handler registration to fail"); + return false; + } + + //otherwise, if the signal handler classes are present we would expect + //registration to succeed + boolean classesPresent = true; + try + { + Class.forName(SUN_MISC_SIGNAL_CLASS); + Class.forName(SUN_MISC_SIGNAL_HANDLER_CLASS); + LOGGER.info("Signal handling classes were present so we expect SIGHUP handler registration to succeed"); + } + catch (ClassNotFoundException cnfe) + { + classesPresent = false; + } + + return classesPresent; + } + + private int getPID() + { + String processName = ManagementFactory.getRuntimeMXBean().getName(); + + int pid = Integer.parseInt(processName.substring(0,processName.indexOf('@'))); + LOGGER.info("PID was determined to be " + pid); + + return pid; + } + +} diff --git a/java/broker/src/test/java/org/apache/qpid/server/stats/StatisticsCounterTest.java b/java/broker/src/test/java/org/apache/qpid/server/stats/StatisticsCounterTest.java new file mode 100644 index 0000000000..fbaa1342c9 --- /dev/null +++ b/java/broker/src/test/java/org/apache/qpid/server/stats/StatisticsCounterTest.java @@ -0,0 +1,144 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.stats; + +import junit.framework.TestCase; + +/** + * Unit tests for the {@link StatisticsCounter} class. + */ +public class StatisticsCounterTest extends TestCase +{ + /** + * Check that statistics counters are created correctly. + */ + public void testCreate() + { + long before = System.currentTimeMillis(); + StatisticsCounter counter = new StatisticsCounter("name", 1234L); + long after = System.currentTimeMillis(); + + assertTrue(before <= counter.getStart()); + assertTrue(after >= counter.getStart()); + assertTrue(counter.getName().startsWith("name-")); + assertEquals(1234L, counter.getPeriod()); + } + + /** + * Check that totals add up correctly. + */ + public void testTotal() + { + StatisticsCounter counter = new StatisticsCounter("test", 1000L); + long start = counter.getStart(); + for (int i = 0; i < 100; i++) + { + counter.registerEvent(i, start + i); + } + assertEquals(99 * 50, counter.getTotal()); // cf. Gauss + } + + /** + * Test totals add up correctly even when messages are delivered + * out-of-order. + */ + public void testTotalOutOfOrder() + { + StatisticsCounter counter = new StatisticsCounter("test", 1000L); + long start = counter.getStart(); + assertEquals(0, counter.getTotal()); + counter.registerEvent(10, start + 2500); + assertEquals(10, counter.getTotal()); + counter.registerEvent(20, start + 1500); + assertEquals(30, counter.getTotal()); + counter.registerEvent(10, start + 500); + assertEquals(40, counter.getTotal()); + } + + /** + * Test that the peak rate is reported correctly. + */ + public void testPeak() throws Exception + { + StatisticsCounter counter = new StatisticsCounter("test", 1000L); + long start = counter.getStart(); + assertEquals(0.0, counter.getPeak()); + Thread.sleep(500); + counter.registerEvent(1000, start + 500); + Thread.sleep(1000); + assertEquals(1000.0, counter.getPeak()); + counter.registerEvent(2000, start + 1500); + Thread.sleep(1000); + assertEquals(2000.0, counter.getPeak()); + counter.registerEvent(1000, start + 2500); + Thread.sleep(1000); + assertEquals(2000.0, counter.getPeak()); + } + + /** + * Test that peak rate is reported correctly for out-of-order messages, + * and the total is also unaffected. + */ + public void testPeakOutOfOrder() throws Exception + { + StatisticsCounter counter = new StatisticsCounter("test", 1000L); + long start = counter.getStart(); + assertEquals(0.0, counter.getPeak()); + counter.registerEvent(1000, start + 2500); + Thread.sleep(1500); + assertEquals(0.0, counter.getPeak()); + counter.registerEvent(2000, start + 1500); + Thread.sleep(1000L); + assertEquals(0.0, counter.getPeak()); + counter.registerEvent(1000, start + 500); + Thread.sleep(1500); + assertEquals(4000.0, counter.getPeak()); + Thread.sleep(2000); + assertEquals(4000.0, counter.getPeak()); + counter.registerEvent(1000, start + 500); + assertEquals(4000.0, counter.getPeak()); + Thread.sleep(2000); + counter.registerEvent(1000); + assertEquals(4000.0, counter.getPeak()); + assertEquals(6000, counter.getTotal()); + } + + /** + * Test the current rate is generated correctly. + */ + public void testRate() throws Exception + { + StatisticsCounter counter = new StatisticsCounter("test", 1000L); + assertEquals(0.0, counter.getRate()); + Thread.sleep(500); + counter.registerEvent(1000); + Thread.sleep(1000); + assertEquals(1000.0, counter.getRate()); + counter.registerEvent(2000); + Thread.sleep(1000); + assertEquals(2000.0, counter.getRate()); + counter.registerEvent(1000); + Thread.sleep(1000); + assertEquals(1000.0, counter.getRate()); + Thread.sleep(1000); + assertEquals(0.0, counter.getRate()); + } +} diff --git a/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java b/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java index 3ebe631f62..3acd064fd7 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java @@ -122,6 +122,7 @@ public class MessageStoreTest extends InternalBrokerBaseCase } catch (Exception e) { + e.printStackTrace(); fail(e.getMessage()); } } @@ -589,7 +590,7 @@ public class MessageStoreTest extends InternalBrokerBaseCase headerBody.classId = BasicConsumeBodyImpl.CLASS_ID; headerBody.bodySize = 0; - headerBody.properties = properties; + headerBody.setProperties(properties); try { diff --git a/java/broker/src/test/java/org/apache/qpid/server/store/ReferenceCountingTest.java b/java/broker/src/test/java/org/apache/qpid/server/store/ReferenceCountingTest.java index a75cbe8662..2d41eb9899 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/store/ReferenceCountingTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/store/ReferenceCountingTest.java @@ -102,7 +102,7 @@ public class ReferenceCountingTest extends QpidTestCase ContentHeaderBody chb = new ContentHeaderBody(); BasicContentHeaderProperties bchp = new BasicContentHeaderProperties(); bchp.setDeliveryMode((byte)2); - chb.properties = bchp; + chb.setProperties(bchp); return chb; } diff --git a/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java b/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java index 1ec134e90e..6fbc627d8c 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java +++ b/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java @@ -22,6 +22,7 @@ package org.apache.qpid.server.subscription; */ import java.util.ArrayList; +import java.util.List; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -45,6 +46,7 @@ public class MockSubscription implements Subscription private State _state = State.ACTIVE; private ArrayList<QueueEntry> messages = new ArrayList<QueueEntry>(); private final Lock _stateChangeLock = new ReentrantLock(); + private List<QueueEntry> _acceptEntries = null; private final QueueEntry.SubscriptionAcquiredState _owningState = new QueueEntry.SubscriptionAcquiredState(this); private final QueueEntry.SubscriptionAssignedState _assignedState = new QueueEntry.SubscriptionAssignedState(this); @@ -54,6 +56,15 @@ public class MockSubscription implements Subscription // Create a simple ID that increments for ever new Subscription private final long _subscriptionID = idGenerator.getAndIncrement(); + public MockSubscription() + { + } + + public MockSubscription(List<QueueEntry> acceptEntries) + { + _acceptEntries = acceptEntries; + } + public void close() { _closed = true; @@ -119,8 +130,15 @@ public class MockSubscription implements Subscription _stateChangeLock.lock(); } - public boolean hasInterest(QueueEntry msg) + public boolean hasInterest(QueueEntry entry) { + if(_acceptEntries != null) + { + //simulate selector behaviour, only signal + //interest in the dictated queue entries + return _acceptEntries.contains(entry); + } + return true; } diff --git a/java/broker/src/test/java/org/apache/qpid/server/subscription/SubscriptionFactoryImplTest.java b/java/broker/src/test/java/org/apache/qpid/server/subscription/SubscriptionFactoryImplTest.java new file mode 100644 index 0000000000..29f45bf7f4 --- /dev/null +++ b/java/broker/src/test/java/org/apache/qpid/server/subscription/SubscriptionFactoryImplTest.java @@ -0,0 +1,84 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.subscription; + +import org.apache.qpid.common.AMQPFilterTypes; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.server.flow.WindowCreditManager; +import org.apache.qpid.server.protocol.ProtocolEngine_0_10; +import org.apache.qpid.server.transport.ServerConnection; +import org.apache.qpid.server.transport.ServerSession; +import org.apache.qpid.server.transport.ServerSessionDelegate; +import org.apache.qpid.server.util.InternalBrokerBaseCase; +import org.apache.qpid.transport.Binary; +import org.apache.qpid.transport.MessageAcceptMode; +import org.apache.qpid.transport.MessageAcquireMode; +import org.apache.qpid.transport.MessageFlowMode; +import org.apache.qpid.transport.TestNetworkConnection; + +public class SubscriptionFactoryImplTest extends InternalBrokerBaseCase +{ + /** + * Tests that while creating Subscriptions of various types, the + * ID numbers assigned are allocated from a common sequence + * (in increasing order). + */ + public void testDifferingSubscriptionTypesShareCommonIdNumberingSequence() throws Exception + { + //create a No-Ack subscription, get the first Subscription ID + long previousId = 0; + Subscription noAckSub = SubscriptionFactoryImpl.INSTANCE.createSubscription(1, getSession(), new AMQShortString("1"), false, null, false, getChannel().getCreditManager()); + previousId = noAckSub.getSubscriptionID(); + + //create an ack subscription, verify the next Subscription ID is used + Subscription ackSub = SubscriptionFactoryImpl.INSTANCE.createSubscription(1, getSession(), new AMQShortString("1"), true, null, false, getChannel().getCreditManager()); + assertEquals("Unexpected Subscription ID allocated", previousId + 1, ackSub.getSubscriptionID()); + previousId = ackSub.getSubscriptionID(); + + //create a browser subscription + FieldTable filters = new FieldTable(); + filters.put(AMQPFilterTypes.NO_CONSUME.getValue(), true); + Subscription browerSub = SubscriptionFactoryImpl.INSTANCE.createSubscription(1, getSession(), new AMQShortString("1"), true, null, false, getChannel().getCreditManager()); + assertEquals("Unexpected Subscription ID allocated", previousId + 1, browerSub.getSubscriptionID()); + previousId = browerSub.getSubscriptionID(); + + //create an BasicGet NoAck subscription + Subscription getNoAckSub = SubscriptionFactoryImpl.INSTANCE.createBasicGetNoAckSubscription(getChannel(), getSession(), new AMQShortString("1"), null, false, + getChannel().getCreditManager(),getChannel().getClientDeliveryMethod(), getChannel().getRecordDeliveryMethod()); + assertEquals("Unexpected Subscription ID allocated", previousId + 1, getNoAckSub.getSubscriptionID()); + previousId = getNoAckSub.getSubscriptionID(); + + //create a 0-10 subscription + ServerConnection conn = new ServerConnection(1); + ProtocolEngine_0_10 engine = new ProtocolEngine_0_10(conn, new TestNetworkConnection(), getRegistry()); + conn.setVirtualHost(getVirtualHost()); + conn.setConnectionConfig(engine); + ServerSessionDelegate sesDel = new ServerSessionDelegate(); + Binary name = new Binary(new byte[]{new Byte("1")}); + ServerSession session = new ServerSession(conn, sesDel, name, 0, engine); + + Subscription sub_0_10 = SubscriptionFactoryImpl.INSTANCE.createSubscription(session, "1", MessageAcceptMode.EXPLICIT, + MessageAcquireMode.PRE_ACQUIRED, MessageFlowMode.WINDOW, new WindowCreditManager(), null, null); + assertEquals("Unexpected Subscription ID allocated", previousId + 1, sub_0_10.getSubscriptionID()); + } + +} diff --git a/java/broker/src/test/java/org/apache/qpid/server/subscription/SubscriptionListTest.java b/java/broker/src/test/java/org/apache/qpid/server/subscription/SubscriptionListTest.java new file mode 100644 index 0000000000..c4d1a1e614 --- /dev/null +++ b/java/broker/src/test/java/org/apache/qpid/server/subscription/SubscriptionListTest.java @@ -0,0 +1,429 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.subscription; + +import org.apache.qpid.server.subscription.SubscriptionList.SubscriptionNode; +import org.apache.qpid.server.subscription.SubscriptionList.SubscriptionNodeIterator; +import org.apache.qpid.test.utils.QpidTestCase; + +public class SubscriptionListTest extends QpidTestCase +{ + private SubscriptionList _subList; + private MockSubscription _sub1; + private MockSubscription _sub2; + private MockSubscription _sub3; + private SubscriptionNode _node; + + protected void setUp() + { + _subList = new SubscriptionList(); + + _sub1 = new MockSubscription(); + _sub2 = new MockSubscription(); + _sub3 = new MockSubscription(); + + _subList.add(_sub1); + _subList.add(_sub2); + _subList.add(_sub3); + + _node = _subList.getHead(); + } + + /** + * Test that if the first (non-head) node in the list is deleted (but is still present), + * it is not returned when searching through the list for the next viable node, and the + * subsequent viable node is returned instead. + */ + public void testFindNextSkipsFirstDeletedNode() + { + assertTrue("Deleting subscription node should have succeeded", + getNodeForSubscription(_subList, _sub1).delete()); + + assertNotNull("Returned node should not be null", _node = _node.findNext()); + assertEquals("Should have returned node for 2nd subscription", _sub2, _node.getSubscription()); + + assertNotNull("Returned node should not be null", _node = _node.findNext()); + assertEquals("Should have returned node for 3rd subscription", _sub3, _node.getSubscription()); + } + + /** + * Test that if a central node in the list is deleted (but is still present), + * it is not returned when searching through the list for the next viable node, + * and the subsequent viable node is returned instead. + */ + public void testFindNextSkipsCentralDeletedNode() + { + assertNotNull("Returned node should not be null", _node = _node.findNext()); + + assertTrue("Deleting subscription node should have succeeded", + getNodeForSubscription(_subList, _sub2).delete()); + + assertNotNull("Returned node should not be null", _node = _node.findNext()); + assertEquals("Should have returned node for 3rd subscription", _sub3, _node.getSubscription()); + } + + /** + * Test that if the last node in the list is deleted (but is still present), + * it is not returned when searching through the list for the next viable node, + * and null is returned instead. + */ + public void testFindNextSkipsLastDeletedNode() + { + assertNotNull("Returned node should not be null", _node = _node.findNext()); + assertEquals("Should have returned node for 1st subscription", _sub1, _node.getSubscription()); + + assertNotNull("Returned node should not be null", _node = _node.findNext()); + assertEquals("Should have returned node for 2nd subscription", _sub2, _node.getSubscription()); + + assertTrue("Deleting subscription node should have succeeded", + getNodeForSubscription(_subList, _sub3).delete()); + + assertNull("Returned node should be null", _node = _node.findNext()); + } + + /** + * Test that if multiple nodes in the list are deleted (but still present), they + * are not returned when searching through the list for the next viable node, + * and the subsequent viable node is returned instead. + */ + public void testFindNextSkipsMultipleDeletedNode() + { + assertTrue("Deleting subscription node should have succeeded", + getNodeForSubscription(_subList, _sub1).delete()); + assertTrue("Deleting subscription node should have succeeded", + getNodeForSubscription(_subList, _sub2).delete()); + + assertNotNull("Returned node should not be null", _node = _node.findNext()); + assertEquals("Should have returned node for 3rd subscription", _sub3, _node.getSubscription()); + } + + /** + * Test that if a node in the list is marked 'deleted' it is still present in the list + * until actually removed. counter-test to verify above testing of getNext() method. + */ + public void testDeletedNodeStillPresent() + { + assertTrue("Deleting subscription node should have succeeded", + getNodeForSubscription(_subList, _sub1).delete()); + + assertNotNull("Node marked deleted should still be present", getNodeForSubscription(_subList, _sub1)); + assertEquals("All 3 nodes are still expected to be present", 3, countNodes(_subList)); + } + + /** + * Traverses the list nodes in a non-mutating fashion, returning the first node which matches the given + * Subscription, or null if none is found. + */ + private SubscriptionNode getNodeForSubscription(final SubscriptionList list, final Subscription sub) + { + SubscriptionNode node = list.getHead(); + while (node != null && node.getSubscription() != sub) + { + node = node.nextNode(); + } + + return node; + } + + /** + * Counts the number of (non-head) nodes in the list. + */ + private int countNodes(final SubscriptionList list) + { + SubscriptionNode node = list.getHead(); + int count; + for(count = -1; node != null; count++) + { + node = node.nextNode(); + } + + return count; + } + + /** + * Tests that the head is returned as expected, and isn't the node for the first subscription. + */ + public void testGetHead() + { + assertNotNull("List head should be non null", _node); + assertNotSame("Head should not be node for first subscription", + _node, getNodeForSubscription(_subList, _sub1)); + } + + /** + * Tests that the size is returned correctly in the face of additions and removals. + */ + public void testGetSize() + { + SubscriptionList subList = new SubscriptionList(); + + assertEquals("Unexpected size result", 0, subList.size()); + + Subscription sub1 = new MockSubscription(); + Subscription sub2 = new MockSubscription(); + Subscription sub3 = new MockSubscription(); + + subList.add(sub1); + assertEquals("Unexpected size result", 1, subList.size()); + + subList.add(sub2); + assertEquals("Unexpected size result", 2, subList.size()); + + subList.add(sub3); + assertEquals("Unexpected size result", 3, subList.size()); + + assertTrue("Removing subscription from list should have succeeded", subList.remove(sub1)); + assertEquals("Unexpected size result", 2, subList.size()); + + assertTrue("Removing subscription from list should have succeeded", subList.remove(sub2)); + assertEquals("Unexpected size result", 1, subList.size()); + + assertTrue("Removing subscription from list should have succeeded", subList.remove(sub3)); + assertEquals("Unexpected size result", 0, subList.size()); + } + + /** + * Test that if the first (non-head) node in the list is removed it is no longer + * present in the node structure of the list at all. + */ + public void testRemoveFirstNode() + { + assertNotNull("Should have been a node present for the subscription", getNodeForSubscription(_subList, _sub1)); + assertTrue("Removing subscription node should have succeeded", _subList.remove(_sub1)); + assertNull("Should not have been a node present for the removed subscription", getNodeForSubscription(_subList, _sub1)); + assertEquals("Unexpected number of nodes", 2, countNodes(_subList)); + assertNotNull("Should have been a node present for the subscription", getNodeForSubscription(_subList, _sub2)); + assertNotNull("Should have been a node present for the subscription", getNodeForSubscription(_subList, _sub3)); + } + + /** + * Test that if a central node in the list is removed it is no longer + * present in the node structure of the list at all. + */ + public void testRemoveCentralNode() + { + assertNotNull("Should have been a node present for the subscription", getNodeForSubscription(_subList, _sub2)); + assertTrue("Removing subscription node should have succeeded", _subList.remove(_sub2)); + assertNull("Should not have been a node present for the removed subscription", getNodeForSubscription(_subList, _sub2)); + assertEquals("Unexpected number of nodes", 2, countNodes(_subList)); + assertNotNull("Should have been a node present for the subscription", getNodeForSubscription(_subList, _sub1)); + assertNotNull("Should have been a node present for the subscription", getNodeForSubscription(_subList, _sub3)); + } + + /** + * Test that if the subscription contained in the last node of the list is removed + * it is no longer present in the node structure of the list at all. However, + * as the last node in the structure can't actually be removed a dummy will instead + * be present. + */ + public void testRemoveLastNode() + { + assertNotNull("Should have been a node present for the subscription", getNodeForSubscription(_subList, _sub3)); + assertTrue("Removing subscription node should have succeeded", _subList.remove(_sub3)); + assertNull("Should not have been a node present for the removed subscription", getNodeForSubscription(_subList, _sub3)); + + //We actually expect 3 nodes to remain this time, because the last node cant be removed for thread safety reasons, + //however a dummy final node can be used as substitute to allow removal of the subscription node. + assertEquals("Unexpected number of nodes", 2 + 1, countNodes(_subList)); + assertNotNull("Should have been a node present for the subscription", getNodeForSubscription(_subList, _sub1)); + assertNotNull("Should have been a node present for the subscription", getNodeForSubscription(_subList, _sub2)); + } + + /** + * Test that if the subscription not contained in the list is requested to be removed + * that the removal fails + */ + public void testRemoveNonExistantNode() + { + Subscription sub4 = new MockSubscription(); + assertNull("Should not have been a node present for the subscription", getNodeForSubscription(_subList, sub4)); + assertFalse("Removing subscription node should not have succeeded", _subList.remove(sub4)); + assertEquals("Unexpected number of nodes", 3, countNodes(_subList)); + } + + /** + * Test that if a subscription node which occurs later in the main list than the marked node is + * removed from the list after the marked node is also removed, then the marker node doesn't + * serve to retain the subsequent nodes in the list structure (and thus memory) despite their + * removal. + */ + public void testDeletedMarkedNodeDoesntLeakSubsequentlyDeletedNodes() + { + //get the nodes out the list for the 1st and 3rd subscriptions + SubscriptionNode sub1Node = getNodeForSubscription(_subList, _sub1); + assertNotNull("Should have been a node present for the subscription", sub1Node); + SubscriptionNode sub3Node = getNodeForSubscription(_subList, _sub3); + assertNotNull("Should have been a node present for the subscription", sub3Node); + + //mark the first subscription node + assertTrue("should have succeeded in updating the marked node", + _subList.updateMarkedNode(_subList.getMarkedNode(), sub1Node)); + + //remove the 1st subscription from the list + assertTrue("Removing subscription node should have succeeded", _subList.remove(_sub1)); + //verify the 1st subscription is no longer the marker node (replaced by a dummy), or in the main list structure + assertNotSame("Unexpected marker node", sub1Node, _subList.getMarkedNode()); + assertNull("Should not have been a node present in the list structure for the marked-but-removed sub1 node", + getNodeForSubscription(_subList, _sub1)); + + //remove the 2nd subscription from the list + assertTrue("Removing subscription node should have succeeded", _subList.remove(_sub2)); + + //verify the marker node isn't leaking subsequently removed nodes, by ensuring the very next node + //in its list structure is now the 3rd subscription (since the 2nd was removed too) + assertEquals("Unexpected next node", sub3Node, _subList.getMarkedNode().nextNode()); + + //remove the 3rd and final/tail subscription + assertTrue("Removing subscription node should have succeeded", _subList.remove(_sub3)); + + //verify the marker node isn't leaking subsequently removed nodes, by ensuring the very next node + //in its list structure is now the dummy tail (since the 3rd subscription was removed, and a dummy + //tail was inserted) and NOT the 3rd sub node. + assertNotSame("Unexpected next node", sub3Node, _subList.getMarkedNode().nextNode()); + assertTrue("Unexpected next node", _subList.getMarkedNode().nextNode().isDeleted()); + assertNull("Next non-deleted node from the marker should now be the list end, i.e. null", _subList.getMarkedNode().findNext()); + } + + /** + * Test that the marked node 'findNext' behaviour is as expected after a subscription is added + * to the list following the tail subscription node being removed while it is the marked node. + * That is, that the new subscriptions node is returned by getMarkedNode().findNext(). + */ + public void testMarkedNodeFindsNewSubscriptionAfterRemovingTailWhilstMarked() + { + //get the node out the list for the 3rd subscription + SubscriptionNode sub3Node = getNodeForSubscription(_subList, _sub3); + assertNotNull("Should have been a node present for the subscription", sub3Node); + + //mark the 3rd subscription node + assertTrue("should have succeeded in updating the marked node", + _subList.updateMarkedNode(_subList.getMarkedNode(), sub3Node)); + + //verify calling findNext on the marked node returns null, i.e. the end of the list has been reached + assertEquals("Unexpected node after marked node", null, _subList.getMarkedNode().findNext()); + + //remove the 3rd(marked) subscription from the list + assertTrue("Removing subscription node should have succeeded", _subList.remove(_sub3)); + + //add a new 4th subscription to the list + Subscription sub4 = new MockSubscription(); + _subList.add(sub4); + + //get the node out the list for the 4th subscription + SubscriptionNode sub4Node = getNodeForSubscription(_subList, sub4); + assertNotNull("Should have been a node present for the subscription", sub4Node); + + //verify the marked node (which is now a dummy substitute for the 3rd subscription) returns + //the 4th subscriptions node as the next non-deleted node. + assertEquals("Unexpected next node", sub4Node, _subList.getMarkedNode().findNext()); + } + + /** + * Test that setting the marked node to null doesn't cause problems during remove operations + */ + public void testRemoveWithNullMarkedNode() + { + //set the marker to null + assertTrue("should have succeeded in updating the marked node", + _subList.updateMarkedNode(_subList.getMarkedNode(), null)); + + //remove the 1st subscription from the main list + assertTrue("Removing subscription node should have succeeded", _subList.remove(_sub1)); + + //verify the 1st subscription is no longer in the main list structure + assertNull("Should not have been a node present in the main list structure for sub1", + getNodeForSubscription(_subList, _sub1)); + assertEquals("Unexpected number of nodes", 2, countNodes(_subList)); + } + + /** + * Tests that after the first (non-head) node of the list is marked deleted but has not + * yet been removed, the iterator still skips it. + */ + public void testIteratorSkipsFirstDeletedNode() + { + //'delete' but dont remove the node for the 1st subscription + assertTrue("Deleting subscription node should have succeeded", + getNodeForSubscription(_subList, _sub1).delete()); + assertNotNull("Should still have been a node present for the deleted subscription", + getNodeForSubscription(_subList, _sub1)); + + SubscriptionNodeIterator iter = _subList.iterator(); + + //verify the iterator returns the 2nd subscriptions node + assertTrue("Iterator should have been able to advance", iter.advance()); + assertEquals("Iterator returned unexpected SubscriptionNode", _sub2, iter.getNode().getSubscription()); + + //verify the iterator returns the 3rd subscriptions node and not the 2nd. + assertTrue("Iterator should have been able to advance", iter.advance()); + assertEquals("Iterator returned unexpected SubscriptionNode", _sub3, iter.getNode().getSubscription()); + } + + /** + * Tests that after a central node of the list is marked deleted but has not yet been removed, + * the iterator still skips it. + */ + public void testIteratorSkipsCentralDeletedNode() + { + //'delete' but dont remove the node for the 2nd subscription + assertTrue("Deleting subscription node should have succeeded", + getNodeForSubscription(_subList, _sub2).delete()); + assertNotNull("Should still have been a node present for the deleted subscription", + getNodeForSubscription(_subList, _sub2)); + + SubscriptionNodeIterator iter = _subList.iterator(); + + //verify the iterator returns the 1st subscriptions node + assertTrue("Iterator should have been able to advance", iter.advance()); + assertEquals("Iterator returned unexpected SubscriptionNode", _sub1, iter.getNode().getSubscription()); + + //verify the iterator returns the 3rd subscriptions node and not the 2nd. + assertTrue("Iterator should have been able to advance", iter.advance()); + assertEquals("Iterator returned unexpected SubscriptionNode", _sub3, iter.getNode().getSubscription()); + } + + /** + * Tests that after the last node of the list is marked deleted but has not yet been removed, + * the iterator still skips it. + */ + public void testIteratorSkipsDeletedFinalNode() + { + //'delete' but dont remove the node for the 3rd subscription + assertTrue("Deleting subscription node should have succeeded", + getNodeForSubscription(_subList, _sub3).delete()); + assertNotNull("Should still have been a node present for the deleted 3rd subscription", + getNodeForSubscription(_subList, _sub3)); + + SubscriptionNodeIterator iter = _subList.iterator(); + + //verify the iterator returns the 1st subscriptions node + assertTrue("Iterator should have been able to advance", iter.advance()); + assertEquals("Iterator returned unexpected SubscriptionNode", _sub1, iter.getNode().getSubscription()); + + //verify the iterator returns the 2nd subscriptions node + assertTrue("Iterator should have been able to advance", iter.advance()); + assertEquals("Iterator returned unexpected SubscriptionNode", _sub2, iter.getNode().getSubscription()); + + //verify the iterator can no longer advance and does not return a subscription node + assertFalse("Iterator should not have been able to advance", iter.advance()); + assertEquals("Iterator returned unexpected SubscriptionNode", null, iter.getNode()); + } +} diff --git a/java/broker/src/test/java/org/apache/qpid/server/txn/MockAction.java b/java/broker/src/test/java/org/apache/qpid/server/txn/MockAction.java index 975e3e91b9..15c135ea2c 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/txn/MockAction.java +++ b/java/broker/src/test/java/org/apache/qpid/server/txn/MockAction.java @@ -32,13 +32,11 @@ class MockAction implements Action private boolean _rollbackFired = false; private boolean _postCommitFired = false; - @Override public void postCommit() { _postCommitFired = true; } - @Override public void onRollback() { _rollbackFired = true; diff --git a/java/broker/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java b/java/broker/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java index 64c62fd029..422105e410 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java +++ b/java/broker/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java @@ -46,67 +46,56 @@ class MockServerMessage implements ServerMessage this.persistent = persistent; } - @Override public boolean isPersistent() { return persistent; } - @Override public MessageReference newReference() { throw new NotImplementedException(); } - @Override public boolean isImmediate() { throw new NotImplementedException(); } - @Override public long getSize() { throw new NotImplementedException(); } - @Override public SessionConfig getSessionConfig() { throw new NotImplementedException(); } - @Override public String getRoutingKey() { throw new NotImplementedException(); } - @Override public AMQMessageHeader getMessageHeader() { throw new NotImplementedException(); } - @Override public long getExpiration() { throw new NotImplementedException(); } - @Override public int getContent(ByteBuffer buf, int offset) { throw new NotImplementedException(); } - @Override public long getArrivalTime() { throw new NotImplementedException(); } - @Override public Long getMessageNumber() { return 0L; diff --git a/java/broker/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java b/java/broker/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java index 5700bba9f8..ff372532ac 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java +++ b/java/broker/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java @@ -61,7 +61,6 @@ class MockStoreTransaction implements Transaction return _state; } - @Override public void enqueueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException { if (_throwExceptionOnQueueOp) @@ -83,8 +82,6 @@ class MockStoreTransaction implements Transaction return _numberOfEnqueuedMessages; } - - @Override public void dequeueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException { if (_throwExceptionOnQueueOp) @@ -95,19 +92,16 @@ class MockStoreTransaction implements Transaction _numberOfDequeuedMessages++; } - @Override public void commitTran() throws AMQStoreException { _state = TransactionState.COMMITTED; } - @Override public StoreFuture commitTranAsync() throws AMQStoreException { throw new NotImplementedException(); } - @Override public void abortTran() throws AMQStoreException { _state = TransactionState.ABORTED; @@ -117,14 +111,11 @@ class MockStoreTransaction implements Transaction { return new TransactionLog() { - - @Override public void configureTransactionLog(String name, TransactionLogRecoveryHandler recoveryHandler, Configuration storeConfiguration, LogSubject logSubject) throws Exception { } - - @Override + public Transaction newTransaction() { storeTransaction.setState(TransactionState.STARTED); diff --git a/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java b/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java index 925b161118..a97134a58d 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java +++ b/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java @@ -44,14 +44,13 @@ import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.TestableMemoryMessageStore; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.test.utils.QpidTestCase; -import org.apache.qpid.util.MockChannel; public class InternalBrokerBaseCase extends QpidTestCase { private IApplicationRegistry _registry; private MessageStore _messageStore; - private MockChannel _channel; + private AMQChannel _channel; private InternalTestProtocolSession _session; private VirtualHost _virtualHost; private AMQQueue _queue; @@ -111,7 +110,7 @@ public class InternalBrokerBaseCase extends QpidTestCase _session = new InternalTestProtocolSession(_virtualHost); CurrentActor.set(_session.getLogActor()); - _channel = new MockChannel(_session, 1, _messageStore); + _channel = new AMQChannel(_session, 1, _messageStore); _session.addChannel(_channel); } @@ -243,7 +242,7 @@ public class InternalBrokerBaseCase extends QpidTestCase //Make Message Persistent properties.setDeliveryMode((byte) 2); - _headerBody.properties = properties; + _headerBody.setProperties(properties); channel.publishContentHeader(_headerBody); } @@ -283,12 +282,12 @@ public class InternalBrokerBaseCase extends QpidTestCase _messageStore = messageStore; } - public MockChannel getChannel() + public AMQChannel getChannel() { return _channel; } - public void setChannel(MockChannel channel) + public void setChannel(AMQChannel channel) { _channel = channel; } diff --git a/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java b/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java index af8997cf40..3c6857e8a9 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java +++ b/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java @@ -20,27 +20,72 @@ */ package org.apache.qpid.server.util; +import java.util.Properties; + import org.apache.commons.configuration.ConfigurationException; import org.apache.qpid.server.configuration.ServerConfiguration; +import org.apache.qpid.server.logging.NullRootMessageLogger; +import org.apache.qpid.server.logging.actors.BrokerActor; +import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.actors.GenericActor; +import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin; import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.security.auth.database.PropertiesPrincipalDatabaseManager; - -import java.util.Properties; - +import org.apache.qpid.server.security.auth.database.PropertiesPrincipalDatabase; +import org.apache.qpid.server.security.auth.manager.AuthenticationManager; +import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager; public class TestApplicationRegistry extends ApplicationRegistry { + public TestApplicationRegistry(ServerConfiguration config) throws ConfigurationException { super(config); } - protected void createDatabaseManager(ServerConfiguration configuration) throws Exception + @Override + public void initialise() throws Exception + { + CurrentActor.setDefault(new BrokerActor(new NullRootMessageLogger())); + GenericActor.setDefaultMessageLogger(new NullRootMessageLogger()); + super.initialise(); + } + + /** + * @see org.apache.qpid.server.registry.ApplicationRegistry#createAuthenticationManager() + */ + @Override + protected AuthenticationManager createAuthenticationManager() throws ConfigurationException { - Properties users = new Properties(); + final Properties users = new Properties(); users.put("guest","guest"); users.put("admin","admin"); - _databaseManager = new PropertiesPrincipalDatabaseManager("testPasswordFile", users); + + final PropertiesPrincipalDatabase ppd = new PropertiesPrincipalDatabase(users); + + AuthenticationManager pdam = new PrincipalDatabaseAuthenticationManager() + { + + /** + * @see org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager#configure(org.apache.qpid.server.configuration.plugins.ConfigurationPlugin) + */ + @Override + public void configure(ConfigurationPlugin config) throws ConfigurationException + { + // We don't pass configuration to this test instance. + } + + @Override + public void initialise() + { + setPrincipalDatabase(ppd); + + super.initialise(); + } + }; + + pdam.initialise(); + + return pdam; } } diff --git a/java/broker/src/test/java/org/apache/qpid/server/virtualhost/HouseKeepingTaskTest.java b/java/broker/src/test/java/org/apache/qpid/server/virtualhost/HouseKeepingTaskTest.java new file mode 100644 index 0000000000..98bf381712 --- /dev/null +++ b/java/broker/src/test/java/org/apache/qpid/server/virtualhost/HouseKeepingTaskTest.java @@ -0,0 +1,67 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.virtualhost; + +import java.util.concurrent.CountDownLatch; + +import org.apache.qpid.server.logging.LogActor; +import org.apache.qpid.server.logging.NullRootMessageLogger; +import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.actors.TestLogActor; +import org.apache.qpid.test.utils.QpidTestCase; + +public class HouseKeepingTaskTest extends QpidTestCase +{ + /** + * Tests that the abstract HouseKeepingTask properly cleans up any LogActor + * it adds to the CurrentActor stack by verifying the CurrentActor set + * before task execution is the CurrentActor after execution. + */ + public void testCurrentActorStackBalance() throws Exception + { + //create and set a test actor + LogActor testActor = new TestLogActor(new NullRootMessageLogger()); + CurrentActor.set(testActor); + + //verify it is returned correctly before executing a HouseKeepingTask + assertEquals("Expected LogActor was not returned", testActor, CurrentActor.get()); + + final CountDownLatch latch = new CountDownLatch(1); + HouseKeepingTask testTask = new HouseKeepingTask(new MockVirtualHost("HouseKeepingTaskTestVhost")) + { + @Override + public void execute() + { + latch.countDown(); + } + }; + + //run the test HouseKeepingTask using the current Thread to influence its CurrentActor stack + testTask.run(); + + assertEquals("The expected LogActor was not returned, the CurrentActor stack has become unbalanced", + testActor, CurrentActor.get()); + assertEquals("HouseKeepingTask execute() method was not run", 0, latch.getCount()); + + //clean up the test actor + CurrentActor.remove(); + } +} diff --git a/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java b/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java new file mode 100644 index 0000000000..7aa314bf22 --- /dev/null +++ b/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java @@ -0,0 +1,271 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.virtualhost; + +import java.util.UUID; + +import org.apache.qpid.server.binding.BindingFactory; +import org.apache.qpid.server.configuration.BrokerConfig; +import org.apache.qpid.server.configuration.ConfigStore; +import org.apache.qpid.server.configuration.ConfiguredObject; +import org.apache.qpid.server.configuration.VirtualHostConfig; +import org.apache.qpid.server.configuration.VirtualHostConfigType; +import org.apache.qpid.server.configuration.VirtualHostConfiguration; +import org.apache.qpid.server.connection.IConnectionRegistry; +import org.apache.qpid.server.exchange.ExchangeFactory; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.federation.BrokerLink; +import org.apache.qpid.server.management.ManagedObject; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.registry.IApplicationRegistry; +import org.apache.qpid.server.security.SecurityManager; +import org.apache.qpid.server.security.auth.manager.AuthenticationManager; +import org.apache.qpid.server.stats.StatisticsCounter; +import org.apache.qpid.server.store.DurableConfigurationStore; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.TransactionLog; + +public class MockVirtualHost implements VirtualHost +{ + private String _name; + + public MockVirtualHost(String name) + { + _name = name; + } + + public void close() + { + + } + + public void createBrokerConnection(String transport, String host, int port, + String vhost, boolean durable, String authMechanism, + String username, String password) + { + + } + + public IApplicationRegistry getApplicationRegistry() + { + return null; + } + + public AuthenticationManager getAuthenticationManager() + { + return null; + } + + public BindingFactory getBindingFactory() + { + return null; + } + + public UUID getBrokerId() + { + return null; + } + + public ConfigStore getConfigStore() + { + return null; + } + + public VirtualHostConfiguration getConfiguration() + { + return null; + } + + public IConnectionRegistry getConnectionRegistry() + { + return null; + } + + public DurableConfigurationStore getDurableConfigurationStore() + { + return null; + } + + public ExchangeFactory getExchangeFactory() + { + return null; + } + + public ExchangeRegistry getExchangeRegistry() + { + return null; + } + + public int getHouseKeepingActiveCount() + { + return 0; + } + + public long getHouseKeepingCompletedTaskCount() + { + return 0; + } + + public int getHouseKeepingPoolSize() + { + return 0; + } + + public long getHouseKeepingTaskCount() + { + return 0; + } + + public ManagedObject getManagedObject() + { + return null; + } + + public MessageStore getMessageStore() + { + return null; + } + + public String getName() + { + return _name; + } + + public QueueRegistry getQueueRegistry() + { + return null; + } + + public SecurityManager getSecurityManager() + { + return null; + } + + public TransactionLog getTransactionLog() + { + return null; + } + + public void removeBrokerConnection(BrokerLink brokerLink) + { + + } + + public void scheduleHouseKeepingTask(long period, HouseKeepingTask task) + { + + } + + public void setHouseKeepingPoolSize(int newSize) + { + + } + + public BrokerConfig getBroker() + { + return null; + } + + public String getFederationTag() + { + return null; + } + + public void setBroker(BrokerConfig brokerConfig) + { + + } + + public VirtualHostConfigType getConfigType() + { + return null; + } + + public long getCreateTime() + { + return 0; + } + + public UUID getId() + { + return null; + } + + public ConfiguredObject<VirtualHostConfigType, VirtualHostConfig> getParent() + { + return null; + } + + public boolean isDurable() + { + return false; + } + + public StatisticsCounter getDataDeliveryStatistics() + { + return null; + } + + public StatisticsCounter getDataReceiptStatistics() + { + return null; + } + + public StatisticsCounter getMessageDeliveryStatistics() + { + return null; + } + + public StatisticsCounter getMessageReceiptStatistics() + { + return null; + } + + public void initialiseStatistics() + { + + } + + public boolean isStatisticsEnabled() + { + return false; + } + + public void registerMessageDelivered(long messageSize) + { + + } + + public void registerMessageReceived(long messageSize, long timestamp) + { + + } + + public void resetStatistics() + { + + } + + public void setStatisticsEnabled(boolean enabled) + { + + } +}
\ No newline at end of file diff --git a/java/broker/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostImplTest.java b/java/broker/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostImplTest.java new file mode 100644 index 0000000000..c87e5a1648 --- /dev/null +++ b/java/broker/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostImplTest.java @@ -0,0 +1,214 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.virtualhost; + +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; + +import org.apache.commons.configuration.ConfigurationException; +import org.apache.commons.configuration.XMLConfiguration; +import org.apache.qpid.server.configuration.ServerConfiguration; +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.store.TestableMemoryMessageStore; +import org.apache.qpid.server.util.TestApplicationRegistry; +import org.apache.qpid.test.utils.QpidTestCase; + +public class VirtualHostImplTest extends QpidTestCase +{ + private ServerConfiguration _configuration; + private ApplicationRegistry _registry; + + @Override + public void tearDown() throws Exception + { + super.tearDown(); + + ApplicationRegistry.remove(); + } + + /** + * Tests that custom routing keys for the queue specified in the configuration + * file are correctly bound to the exchange (in addition to the queue name) + */ + public void testSpecifyingCustomBindings() throws Exception + { + customBindingTestImpl(new String[]{"custom1","custom2"}); + } + + /** + * Tests that a queue specified in the configuration file to be bound to a + * specified(non-default) direct exchange is a correctly bound to the exchange + * and the default exchange using the queue name. + */ + public void testQueueSpecifiedInConfigurationIsBoundToDefaultExchange() throws Exception + { + customBindingTestImpl(new String[0]); + } + + private void customBindingTestImpl(final String[] routingKeys) throws Exception + { + String exchangeName = getName() +".direct"; + String vhostName = getName(); + String queueName = getName(); + + File config = writeConfigFile(vhostName, queueName, exchangeName, false, routingKeys); + VirtualHost vhost = createVirtualHost(vhostName, config); + assertNotNull("virtualhost should exist", vhost); + + AMQQueue queue = vhost.getQueueRegistry().getQueue(queueName); + assertNotNull("queue should exist", queue); + + Exchange defaultExch = vhost.getExchangeRegistry().getDefaultExchange(); + assertTrue("queue should have been bound to default exchange with its name", defaultExch.isBound(queueName, queue)); + + Exchange exch = vhost.getExchangeRegistry().getExchange(exchangeName); + assertTrue("queue should have been bound to " + exchangeName + " with its name", exch.isBound(queueName, queue)); + + for(String key: routingKeys) + { + assertTrue("queue should have been bound to " + exchangeName + " with key " + key, exch.isBound(key, queue)); + } + } + + /** + * Tests that specifying custom routing keys for a queue in the configuration file results in failure + * to create the vhost (since this is illegal, only queue names are used with the default exchange) + */ + public void testSpecifyingCustomBindingForDefaultExchangeThrowsException() throws Exception + { + File config = writeConfigFile(getName(), getName(), null, false, new String[]{"custom-binding"}); + + try + { + createVirtualHost(getName(), config); + fail("virtualhost creation should have failed due to illegal configuration"); + } + catch (ConfigurationException e) + { + //expected + } + } + + /** + * Tests that specifying an unknown exchange to bind the queue to results in failure to create the vhost + */ + public void testSpecifyingUnknownExchangeThrowsException() throws Exception + { + File config = writeConfigFile(getName(), getName(), "made-up-exchange", true, new String[0]); + + try + { + createVirtualHost(getName(), config); + fail("virtualhost creation should have failed due to illegal configuration"); + } + catch (ConfigurationException e) + { + //expected + } + } + + private VirtualHost createVirtualHost(String vhostName, File config) throws Exception + { + _configuration = new ServerConfiguration(new XMLConfiguration(config)); + + _registry = new TestApplicationRegistry(_configuration); + ApplicationRegistry.initialise(_registry); + + return _registry.getVirtualHostRegistry().getVirtualHost(vhostName); + } + + /** + * Create a configuration file for testing virtualhost creation + * + * @param vhostName name of the virtualhost + * @param queueName name of the queue + * @param exchangeName name of a direct exchange to declare (unless dontDeclare = true) and bind the queue to (null = none) + * @param dontDeclare if true then dont declare the exchange, even if its name is non-null + * @param routingKeys routingKeys to bind the queue with (empty array = none) + * @return + */ + private File writeConfigFile(String vhostName, String queueName, String exchangeName, boolean dontDeclare, String[] routingKeys) + { + File tmpFile = null; + try + { + tmpFile = File.createTempFile(getName(), ".tmp"); + tmpFile.deleteOnExit(); + + FileWriter fstream = new FileWriter(tmpFile); + BufferedWriter writer = new BufferedWriter(fstream); + + //extra outer tag to please Commons Configuration + writer.write("<configuration>"); + + writer.write("<virtualhosts>"); + writer.write(" <default>" + vhostName + "</default>"); + writer.write(" <virtualhost>"); + writer.write(" <store>"); + writer.write(" <class>" + TestableMemoryMessageStore.class.getName() + "</class>"); + writer.write(" </store>"); + writer.write(" <name>" + vhostName + "</name>"); + writer.write(" <" + vhostName + ">"); + if(exchangeName != null && !dontDeclare) + { + writer.write(" <exchanges>"); + writer.write(" <exchange>"); + writer.write(" <type>direct</type>"); + writer.write(" <name>" + exchangeName + "</name>"); + writer.write(" </exchange>"); + writer.write(" </exchanges>"); + } + writer.write(" <queues>"); + writer.write(" <queue>"); + writer.write(" <name>" + queueName + "</name>"); + writer.write(" <" + queueName + ">"); + if(exchangeName != null) + { + writer.write(" <exchange>" + exchangeName + "</exchange>"); + } + for(String routingKey: routingKeys) + { + writer.write(" <routingKey>" + routingKey + "</routingKey>"); + } + writer.write(" </" + queueName + ">"); + writer.write(" </queue>"); + writer.write(" </queues>"); + writer.write(" </" + vhostName + ">"); + writer.write(" </virtualhost>"); + writer.write("</virtualhosts>"); + + writer.write("</configuration>"); + + writer.flush(); + writer.close(); + } + catch (IOException e) + { + fail("Unable to create virtualhost configuration"); + } + + return tmpFile; + } +} |