diff options
Diffstat (limited to 'qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java')
-rw-r--r-- | qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java | 303 |
1 files changed, 134 insertions, 169 deletions
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java index c8ccdf91bb..ae38a75e7a 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java @@ -25,7 +25,6 @@ import java.io.InputStreamReader; import java.io.LineNumberReader; import java.io.PrintStream; import java.net.MalformedURLException; -import java.net.URL; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -55,20 +54,19 @@ import org.apache.commons.lang.StringUtils; import org.apache.log4j.Level; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; +import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQConnectionFactory; import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.transport.TransportConnection; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.jms.ConnectionURL; import org.apache.qpid.management.common.mbeans.ConfigurationManagement; -import org.apache.qpid.server.Broker; -import org.apache.qpid.server.BrokerOptions; -import org.apache.qpid.server.ProtocolExclusion; import org.apache.qpid.server.configuration.ServerConfiguration; -import org.apache.qpid.server.protocol.AmqpProtocolVersion; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry; import org.apache.qpid.server.store.DerbyMessageStore; import org.apache.qpid.url.URLSyntaxException; -import org.apache.qpid.util.FileUtils; import org.apache.qpid.util.LogMonitor; /** @@ -76,13 +74,6 @@ import org.apache.qpid.util.LogMonitor; */ public class QpidBrokerTestCase extends QpidTestCase { - - public enum BrokerType - { - EXTERNAL /** Test case relies on a Broker started independently of the test-suite */, - INTERNAL /** Test case starts an embedded broker within this JVM */, - SPAWNED /** Test case spawns a new broker as a separate process */ - } protected final String QpidHome = System.getProperty("QPID_HOME"); protected File _configFile = new File(System.getProperty("broker.config")); @@ -91,6 +82,7 @@ public class QpidBrokerTestCase extends QpidTestCase protected long RECEIVE_TIMEOUT = 1000l; + private Map<String, String> _propertiesSetForTestOnly = new HashMap<String, String>(); private Map<String, String> _propertiesSetForBroker = new HashMap<String, String>(); private Map<Logger, Level> _loggerLevelSetForTest = new HashMap<Logger, Level>(); @@ -114,11 +106,9 @@ public class QpidBrokerTestCase extends QpidTestCase // system properties private static final String BROKER_LANGUAGE = "broker.language"; - private static final String BROKER_TYPE = "broker.type"; - private static final String BROKER_COMMAND = "broker.command"; + private static final String BROKER = "broker"; private static final String BROKER_CLEAN = "broker.clean"; private static final String BROKER_CLEAN_BETWEEN_TESTS = "broker.clean.between.tests"; - private static final String BROKER_EXISTING_QPID_WORK = "broker.existing.qpid.work"; private static final String BROKER_VERSION = "broker.version"; protected static final String BROKER_READY = "broker.ready"; private static final String BROKER_STOPPED = "broker.stopped"; @@ -126,30 +116,29 @@ public class QpidBrokerTestCase extends QpidTestCase private static final String BROKER_LOG_INTERLEAVE = "broker.log.interleave"; private static final String BROKER_LOG_PREFIX = "broker.log.prefix"; private static final String BROKER_PERSITENT = "broker.persistent"; - private static final String BROKER_PROTOCOL_EXCLUDES = "broker.protocol.excludes"; - // values protected static final String JAVA = "java"; protected static final String CPP = "cpp"; + protected static final String VM = "vm"; + protected static final String EXTERNAL = "external"; + private static final String VERSION_08 = "0-8"; + private static final String VERSION_010 = "0-10"; protected static final String QPID_HOME = "QPID_HOME"; public static final int DEFAULT_VM_PORT = 1; public static final int DEFAULT_PORT = Integer.getInteger("test.port", ServerConfiguration.DEFAULT_PORT); - public static final int FAILING_PORT = Integer.parseInt(System.getProperty("test.port.alt")); public static final int DEFAULT_MANAGEMENT_PORT = Integer.getInteger("test.mport", ServerConfiguration.DEFAULT_JMXPORT); public static final int DEFAULT_SSL_PORT = Integer.getInteger("test.sslport", ServerConfiguration.DEFAULT_SSL_PORT); protected String _brokerLanguage = System.getProperty(BROKER_LANGUAGE, JAVA); - protected BrokerType _brokerType = BrokerType.valueOf(System.getProperty(BROKER_TYPE, "").toUpperCase()); - protected String _brokerCommand = System.getProperty(BROKER_COMMAND); + protected String _broker = System.getProperty(BROKER, VM); private String _brokerClean = System.getProperty(BROKER_CLEAN, null); private Boolean _brokerCleanBetweenTests = Boolean.getBoolean(BROKER_CLEAN_BETWEEN_TESTS); - private final AmqpProtocolVersion _brokerVersion = AmqpProtocolVersion.valueOf(System.getProperty(BROKER_VERSION, "")); + private String _brokerVersion = System.getProperty(BROKER_VERSION, VERSION_08); protected String _output = System.getProperty(TEST_OUTPUT); protected Boolean _brokerPersistent = Boolean.getBoolean(BROKER_PERSITENT); - private String _brokerProtocolExcludes = System.getProperty(BROKER_PROTOCOL_EXCLUDES); protected static String _brokerLogPrefix = System.getProperty(BROKER_LOG_PREFIX,"BROKER: "); protected static boolean _interleaveBrokerLog = Boolean.getBoolean(BROKER_LOG_INTERLEAVE); @@ -158,7 +147,7 @@ public class QpidBrokerTestCase extends QpidTestCase protected PrintStream _brokerOutputStream; - protected Map<Integer, BrokerHolder> _brokers = new HashMap<Integer, BrokerHolder>(); + protected Map<Integer, Process> _brokers = new HashMap<Integer, Process>(); protected InitialContext _initialContext; protected AMQConnectionFactory _connectionFactory; @@ -294,16 +283,6 @@ public class QpidBrokerTestCase extends QpidTestCase fail("Unable to test without config file:" + _configFile); } - String existingQpidWorkPath = System.getProperty(BROKER_EXISTING_QPID_WORK); - if(existingQpidWorkPath != null && !existingQpidWorkPath.equals("")) - { - cleanBroker(); - - File existing = new File(existingQpidWorkPath); - File qpidWork = new File(getQpidWork(_brokerType, getPort())); - FileUtils.copyRecursive(existing, qpidWork); - } - startBroker(); } @@ -404,8 +383,13 @@ public class QpidBrokerTestCase extends QpidTestCase } } + public void startBroker() throws Exception + { + startBroker(0); + } + /** - * Return the management port in use by the broker on this main port + * Return the management portin use by the broker on this main port * * @param mainPort the broker's main port. * @@ -413,7 +397,7 @@ public class QpidBrokerTestCase extends QpidTestCase */ protected int getManagementPort(int mainPort) { - return mainPort + (DEFAULT_MANAGEMENT_PORT - DEFAULT_PORT); + return mainPort + (DEFAULT_MANAGEMENT_PORT - (_broker.equals(VM) ? DEFAULT_VM_PORT : DEFAULT_PORT)); } /** @@ -428,7 +412,11 @@ public class QpidBrokerTestCase extends QpidTestCase protected int getPort(int port) { - if (!_brokerType.equals(BrokerType.EXTERNAL)) + if (_broker.equals(VM)) + { + return port == 0 ? DEFAULT_VM_PORT : port; + } + else if (!_broker.equals(EXTERNAL)) { return port == 0 ? DEFAULT_PORT : port; } @@ -440,18 +428,11 @@ public class QpidBrokerTestCase extends QpidTestCase protected String getBrokerCommand(int port) throws MalformedURLException { - final String protocolExcludesList = _brokerProtocolExcludes.replace("@PORT", "" + port); - return _brokerCommand + return _broker .replace("@PORT", "" + port) .replace("@SSL_PORT", "" + (port - 1)) .replace("@MPORT", "" + getManagementPort(port)) - .replace("@CONFIG_FILE", _configFile.toString()) - .replace("@EXCLUDES", protocolExcludesList); - } - - public void startBroker() throws Exception - { - startBroker(0); + .replace("@CONFIG_FILE", _configFile.toString()); } public void startBroker(int port) throws Exception @@ -462,38 +443,38 @@ public class QpidBrokerTestCase extends QpidTestCase saveTestConfiguration(); saveTestVirtualhosts(); - if(_brokers.get(port) != null) - { - throw new IllegalStateException("There is already an existing broker running on port " + port); - } - - if (_brokerType.equals(BrokerType.INTERNAL) && !existingInternalBroker()) + Process process = null; + if (_broker.equals(VM)) { + setConfigurationProperty("management.jmxport", String.valueOf(getManagementPort(port))); setConfigurationProperty(ServerConfiguration.MGMT_CUSTOM_REGISTRY_SOCKET, String.valueOf(false)); saveTestConfiguration(); - - BrokerOptions options = new BrokerOptions(); - options.setConfigFile(_configFile.getAbsolutePath()); - options.addPort(port); - - addExcludedPorts(port, options); - - options.setJmxPort(getManagementPort(port)); - - //Set the log config file, relying on the log4j.configuration system property - //set on the JVM by the JUnit runner task in module.xml. - options.setLogConfigFile(new URL(System.getProperty("log4j.configuration")).getFile()); - - Broker broker = new Broker(); - _logger.info("starting internal broker (same JVM)"); - broker.startup(options); - - _brokers.put(port, new InternalBrokerHolder(broker)); + + // create an in_VM broker + final ConfigurationFileApplicationRegistry registry = new ConfigurationFileApplicationRegistry(_configFile); + try + { + ApplicationRegistry.initialise(registry, port); + } + catch (Exception e) + { + _logger.error("Broker initialise failed due to:",e); + try + { + registry.close(); + } + catch (Throwable closeE) + { + closeE.printStackTrace(); + } + throw e; + } + TransportConnection.createVMBroker(port); } - else if (!_brokerType.equals(BrokerType.EXTERNAL)) + else if (!_broker.equals(EXTERNAL)) { String cmd = getBrokerCommand(port); - _logger.info("starting external broker: " + cmd); + _logger.info("starting broker: " + cmd); ProcessBuilder pb = new ProcessBuilder(cmd.split("\\s+")); pb.redirectErrorStream(true); @@ -509,7 +490,7 @@ public class QpidBrokerTestCase extends QpidTestCase // DON'T change PNAME, qpid.stop needs this value. env.put("QPID_PNAME", "-DPNAME=QPBRKR -DTNAME=\"" + _testName + "\""); // Add the port to QPID_WORK to ensure unique working dirs for multi broker tests - env.put("QPID_WORK", getQpidWork(_brokerType, port)); + env.put("QPID_WORK", System.getProperty("QPID_WORK")+ "/" + port); // Use the environment variable to set amqj.logging.level for the broker @@ -561,7 +542,8 @@ public class QpidBrokerTestCase extends QpidTestCase env.put("QPID_OPTS", QPID_OPTS); } } - Process process = pb.start();; + + process = pb.start(); Piper p = new Piper(process.getInputStream(), _brokerOutputStream, @@ -582,7 +564,6 @@ public class QpidBrokerTestCase extends QpidTestCase try { - //test that the broker is still running and hasn't exited unexpectedly int exit = process.exitValue(); _logger.info("broker aborted: " + exit); cleanBroker(); @@ -590,58 +571,11 @@ public class QpidBrokerTestCase extends QpidTestCase } catch (IllegalThreadStateException e) { - // this is expect if the broker started successfully + // this is expect if the broker started succesfully } - - _brokers.put(port, new SpawnedBrokerHolder(process)); } - } - private void addExcludedPorts(int port, BrokerOptions options) - { - final String protocolExcludesList = _brokerProtocolExcludes.replace("@PORT", "" + port); - - if (protocolExcludesList.equals("")) - { - return; - } - final String[] toks = protocolExcludesList.split("\\s"); - - if(toks.length % 2 != 0) - { - throw new IllegalArgumentException("Must be an even number of tokens in '" + protocolExcludesList + "'"); - } - for (int i = 0; i < toks.length; i=i+2) - { - String excludeArg = toks[i]; - final int excludedPort = Integer.parseInt(toks[i+1]); - options.addExcludedPort(ProtocolExclusion.lookup(excludeArg), excludedPort); - - _logger.info("Adding protocol exclusion " + excludeArg + " " + excludedPort); - } - } - - private boolean existingInternalBroker() - { - for(BrokerHolder holder : _brokers.values()) - { - if(holder instanceof InternalBrokerHolder) - { - return true; - } - } - - return false; - } - - private String getQpidWork(BrokerType broker, int port) - { - if (!broker.equals(BrokerType.EXTERNAL)) - { - return System.getProperty("QPID_WORK")+ "/" + port; - } - - return System.getProperty("QPID_WORK"); + _brokers.put(port, process); } public String getTestConfigFile() @@ -722,17 +656,20 @@ public class QpidBrokerTestCase extends QpidTestCase port = getPort(port); _logger.info("stopping broker: " + getBrokerCommand(port)); - BrokerHolder broker = _brokers.remove(port); - broker.shutdown(); + Process process = _brokers.remove(port); + if (process != null) + { + process.destroy(); + process.waitFor(); + _logger.info("broker exited: " + process.exitValue()); + } + else if (_broker.equals(VM)) + { + TransportConnection.killVMBroker(port); + ApplicationRegistry.remove(port); + } } - public boolean isBrokerPresent(int port) throws Exception - { - port = getPort(port); - - return _brokers.containsKey(port); - } - /** * Attempt to set the Java Broker to use the BDBMessageStore for persistence * Falling back to the DerbyMessageStore if @@ -874,14 +811,20 @@ public class QpidBrokerTestCase extends QpidTestCase } /** - * Set a System property for the client (and broker if using the same vm) of this test. + * Set a System (-D) property for the external Broker of this test. * * @param property The property to set * @param value the value to set it to. */ protected void setTestClientSystemProperty(String property, String value) { - setTestSystemProperty(property, value); + if (!_propertiesSetForTestOnly.containsKey(property)) + { + // Record the current value so we can revert it later. + _propertiesSetForTestOnly.put(property, System.getProperty(property)); + } + + System.setProperty(property, value); } /** @@ -889,7 +832,20 @@ public class QpidBrokerTestCase extends QpidTestCase */ protected void revertSystemProperties() { - revertTestSystemProperties(); + for (String key : _propertiesSetForTestOnly.keySet()) + { + String value = _propertiesSetForTestOnly.get(key); + if (value != null) + { + System.setProperty(key, value); + } + else + { + System.clearProperty(key); + } + } + + _propertiesSetForTestOnly.clear(); // We don't change the current VMs settings for Broker only properties // so we can just clear this map @@ -948,17 +904,17 @@ public class QpidBrokerTestCase extends QpidTestCase */ public boolean isBroker08() { - return _brokerVersion.equals(AmqpProtocolVersion.v0_8); + return _brokerVersion.equals(VERSION_08); } public boolean isBroker010() { - return _brokerVersion.equals(AmqpProtocolVersion.v0_10); + return _brokerVersion.equals(VERSION_010); } protected boolean isJavaBroker() { - return _brokerLanguage.equals("java") || _brokerType.equals("vm"); + return _brokerLanguage.equals("java") || _broker.equals("vm"); } protected boolean isCppBroker() @@ -968,14 +924,9 @@ public class QpidBrokerTestCase extends QpidTestCase protected boolean isExternalBroker() { - return !_brokerType.equals("vm"); //TODO + return !_broker.equals("vm"); } - - protected boolean isInternalBroker() - { - return _brokerType.equals(BrokerType.INTERNAL); - } - + protected boolean isBrokerStorePersistent() { return _brokerPersistent; @@ -1047,6 +998,11 @@ public class QpidBrokerTestCase extends QpidTestCase */ public AMQConnectionFactory getConnectionFactory(String factoryName) throws NamingException { + if (_broker.equals(VM)) + { + factoryName += ".vm"; + } + return (AMQConnectionFactory) getInitialContext().lookup(factoryName); } @@ -1087,7 +1043,15 @@ public class QpidBrokerTestCase extends QpidTestCase public Connection getClientConnection(String username, String password, String id) throws JMSException, URLSyntaxException, AMQException, NamingException { _logger.info("get Connection"); - Connection con = getConnectionFactory().createConnection(username, password, id); + Connection con; + if (_broker.equals(VM)) + { + con = new AMQConnection("vm://:1", username, password, id, "test"); + } + else + { + con = getConnectionFactory().createConnection(username, password, id); + } //add the connection in the lis of connections _connections.add(con); return con; @@ -1125,8 +1089,7 @@ public class QpidBrokerTestCase extends QpidTestCase c.close(); } } - finally - { + finally{ // Ensure any problems with close does not interfer with property resets revertSystemProperties(); revertLoggingLevels(); @@ -1227,8 +1190,7 @@ public class QpidBrokerTestCase extends QpidTestCase MessageProducer producer = session.createProducer(destination); - int i = offset; - for (; i < (count + offset); i++) + for (int i = offset; i < (count + offset); i++) { Message next = createNextMessage(session, i); @@ -1251,7 +1213,7 @@ public class QpidBrokerTestCase extends QpidTestCase // we have no batchSize or // our count is not divible by batchSize. if (session.getTransacted() && - ( batchSize == 0 || (i-1) % batchSize != 0)) + ( batchSize == 0 || count % batchSize != 0)) { session.commit(); } @@ -1346,26 +1308,29 @@ public class QpidBrokerTestCase extends QpidTestCase */ public void reloadBrokerSecurityConfig() throws Exception { - JMXTestUtils jmxu = new JMXTestUtils(this, "admin" , "admin"); - jmxu.open(); - - try + if (_broker.equals(VM)) { - ConfigurationManagement configMBean = jmxu.getConfigurationManagement(); - configMBean.reloadSecurityConfiguration(); + ApplicationRegistry.getInstance().getConfiguration().reparseConfigFileSecuritySections(); } - finally + else { - jmxu.close(); - } - - LogMonitor _monitor = new LogMonitor(_outputFile); - assertTrue("The expected server security configuration reload did not occur", - _monitor.waitForMessage(ServerConfiguration.SECURITY_CONFIG_RELOADED, LOGMONITOR_TIMEOUT)); - } + JMXTestUtils jmxu = new JMXTestUtils(this, "admin" , "admin"); + jmxu.open(); + + try + { + ConfigurationManagement configMBean = jmxu.getConfigurationManagement(); + configMBean.reloadSecurityConfiguration(); + } + finally + { + jmxu.close(); + } + + LogMonitor _monitor = new LogMonitor(_outputFile); + assertTrue("The expected server security configuration reload did not occur", + _monitor.waitForMessage(ServerConfiguration.SECURITY_CONFIG_RELOADED, LOGMONITOR_TIMEOUT)); - protected int getFailingPort() - { - return FAILING_PORT; + } } } |