summaryrefslogtreecommitdiff
path: root/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java')
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java303
1 files changed, 134 insertions, 169 deletions
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
index c8ccdf91bb..ae38a75e7a 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
@@ -25,7 +25,6 @@ import java.io.InputStreamReader;
import java.io.LineNumberReader;
import java.io.PrintStream;
import java.net.MalformedURLException;
-import java.net.URL;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -55,20 +54,19 @@ import org.apache.commons.lang.StringUtils;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQConnectionFactory;
import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.transport.TransportConnection;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.jms.ConnectionURL;
import org.apache.qpid.management.common.mbeans.ConfigurationManagement;
-import org.apache.qpid.server.Broker;
-import org.apache.qpid.server.BrokerOptions;
-import org.apache.qpid.server.ProtocolExclusion;
import org.apache.qpid.server.configuration.ServerConfiguration;
-import org.apache.qpid.server.protocol.AmqpProtocolVersion;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry;
import org.apache.qpid.server.store.DerbyMessageStore;
import org.apache.qpid.url.URLSyntaxException;
-import org.apache.qpid.util.FileUtils;
import org.apache.qpid.util.LogMonitor;
/**
@@ -76,13 +74,6 @@ import org.apache.qpid.util.LogMonitor;
*/
public class QpidBrokerTestCase extends QpidTestCase
{
-
- public enum BrokerType
- {
- EXTERNAL /** Test case relies on a Broker started independently of the test-suite */,
- INTERNAL /** Test case starts an embedded broker within this JVM */,
- SPAWNED /** Test case spawns a new broker as a separate process */
- }
protected final String QpidHome = System.getProperty("QPID_HOME");
protected File _configFile = new File(System.getProperty("broker.config"));
@@ -91,6 +82,7 @@ public class QpidBrokerTestCase extends QpidTestCase
protected long RECEIVE_TIMEOUT = 1000l;
+ private Map<String, String> _propertiesSetForTestOnly = new HashMap<String, String>();
private Map<String, String> _propertiesSetForBroker = new HashMap<String, String>();
private Map<Logger, Level> _loggerLevelSetForTest = new HashMap<Logger, Level>();
@@ -114,11 +106,9 @@ public class QpidBrokerTestCase extends QpidTestCase
// system properties
private static final String BROKER_LANGUAGE = "broker.language";
- private static final String BROKER_TYPE = "broker.type";
- private static final String BROKER_COMMAND = "broker.command";
+ private static final String BROKER = "broker";
private static final String BROKER_CLEAN = "broker.clean";
private static final String BROKER_CLEAN_BETWEEN_TESTS = "broker.clean.between.tests";
- private static final String BROKER_EXISTING_QPID_WORK = "broker.existing.qpid.work";
private static final String BROKER_VERSION = "broker.version";
protected static final String BROKER_READY = "broker.ready";
private static final String BROKER_STOPPED = "broker.stopped";
@@ -126,30 +116,29 @@ public class QpidBrokerTestCase extends QpidTestCase
private static final String BROKER_LOG_INTERLEAVE = "broker.log.interleave";
private static final String BROKER_LOG_PREFIX = "broker.log.prefix";
private static final String BROKER_PERSITENT = "broker.persistent";
- private static final String BROKER_PROTOCOL_EXCLUDES = "broker.protocol.excludes";
-
// values
protected static final String JAVA = "java";
protected static final String CPP = "cpp";
+ protected static final String VM = "vm";
+ protected static final String EXTERNAL = "external";
+ private static final String VERSION_08 = "0-8";
+ private static final String VERSION_010 = "0-10";
protected static final String QPID_HOME = "QPID_HOME";
public static final int DEFAULT_VM_PORT = 1;
public static final int DEFAULT_PORT = Integer.getInteger("test.port", ServerConfiguration.DEFAULT_PORT);
- public static final int FAILING_PORT = Integer.parseInt(System.getProperty("test.port.alt"));
public static final int DEFAULT_MANAGEMENT_PORT = Integer.getInteger("test.mport", ServerConfiguration.DEFAULT_JMXPORT);
public static final int DEFAULT_SSL_PORT = Integer.getInteger("test.sslport", ServerConfiguration.DEFAULT_SSL_PORT);
protected String _brokerLanguage = System.getProperty(BROKER_LANGUAGE, JAVA);
- protected BrokerType _brokerType = BrokerType.valueOf(System.getProperty(BROKER_TYPE, "").toUpperCase());
- protected String _brokerCommand = System.getProperty(BROKER_COMMAND);
+ protected String _broker = System.getProperty(BROKER, VM);
private String _brokerClean = System.getProperty(BROKER_CLEAN, null);
private Boolean _brokerCleanBetweenTests = Boolean.getBoolean(BROKER_CLEAN_BETWEEN_TESTS);
- private final AmqpProtocolVersion _brokerVersion = AmqpProtocolVersion.valueOf(System.getProperty(BROKER_VERSION, ""));
+ private String _brokerVersion = System.getProperty(BROKER_VERSION, VERSION_08);
protected String _output = System.getProperty(TEST_OUTPUT);
protected Boolean _brokerPersistent = Boolean.getBoolean(BROKER_PERSITENT);
- private String _brokerProtocolExcludes = System.getProperty(BROKER_PROTOCOL_EXCLUDES);
protected static String _brokerLogPrefix = System.getProperty(BROKER_LOG_PREFIX,"BROKER: ");
protected static boolean _interleaveBrokerLog = Boolean.getBoolean(BROKER_LOG_INTERLEAVE);
@@ -158,7 +147,7 @@ public class QpidBrokerTestCase extends QpidTestCase
protected PrintStream _brokerOutputStream;
- protected Map<Integer, BrokerHolder> _brokers = new HashMap<Integer, BrokerHolder>();
+ protected Map<Integer, Process> _brokers = new HashMap<Integer, Process>();
protected InitialContext _initialContext;
protected AMQConnectionFactory _connectionFactory;
@@ -294,16 +283,6 @@ public class QpidBrokerTestCase extends QpidTestCase
fail("Unable to test without config file:" + _configFile);
}
- String existingQpidWorkPath = System.getProperty(BROKER_EXISTING_QPID_WORK);
- if(existingQpidWorkPath != null && !existingQpidWorkPath.equals(""))
- {
- cleanBroker();
-
- File existing = new File(existingQpidWorkPath);
- File qpidWork = new File(getQpidWork(_brokerType, getPort()));
- FileUtils.copyRecursive(existing, qpidWork);
- }
-
startBroker();
}
@@ -404,8 +383,13 @@ public class QpidBrokerTestCase extends QpidTestCase
}
}
+ public void startBroker() throws Exception
+ {
+ startBroker(0);
+ }
+
/**
- * Return the management port in use by the broker on this main port
+ * Return the management portin use by the broker on this main port
*
* @param mainPort the broker's main port.
*
@@ -413,7 +397,7 @@ public class QpidBrokerTestCase extends QpidTestCase
*/
protected int getManagementPort(int mainPort)
{
- return mainPort + (DEFAULT_MANAGEMENT_PORT - DEFAULT_PORT);
+ return mainPort + (DEFAULT_MANAGEMENT_PORT - (_broker.equals(VM) ? DEFAULT_VM_PORT : DEFAULT_PORT));
}
/**
@@ -428,7 +412,11 @@ public class QpidBrokerTestCase extends QpidTestCase
protected int getPort(int port)
{
- if (!_brokerType.equals(BrokerType.EXTERNAL))
+ if (_broker.equals(VM))
+ {
+ return port == 0 ? DEFAULT_VM_PORT : port;
+ }
+ else if (!_broker.equals(EXTERNAL))
{
return port == 0 ? DEFAULT_PORT : port;
}
@@ -440,18 +428,11 @@ public class QpidBrokerTestCase extends QpidTestCase
protected String getBrokerCommand(int port) throws MalformedURLException
{
- final String protocolExcludesList = _brokerProtocolExcludes.replace("@PORT", "" + port);
- return _brokerCommand
+ return _broker
.replace("@PORT", "" + port)
.replace("@SSL_PORT", "" + (port - 1))
.replace("@MPORT", "" + getManagementPort(port))
- .replace("@CONFIG_FILE", _configFile.toString())
- .replace("@EXCLUDES", protocolExcludesList);
- }
-
- public void startBroker() throws Exception
- {
- startBroker(0);
+ .replace("@CONFIG_FILE", _configFile.toString());
}
public void startBroker(int port) throws Exception
@@ -462,38 +443,38 @@ public class QpidBrokerTestCase extends QpidTestCase
saveTestConfiguration();
saveTestVirtualhosts();
- if(_brokers.get(port) != null)
- {
- throw new IllegalStateException("There is already an existing broker running on port " + port);
- }
-
- if (_brokerType.equals(BrokerType.INTERNAL) && !existingInternalBroker())
+ Process process = null;
+ if (_broker.equals(VM))
{
+ setConfigurationProperty("management.jmxport", String.valueOf(getManagementPort(port)));
setConfigurationProperty(ServerConfiguration.MGMT_CUSTOM_REGISTRY_SOCKET, String.valueOf(false));
saveTestConfiguration();
-
- BrokerOptions options = new BrokerOptions();
- options.setConfigFile(_configFile.getAbsolutePath());
- options.addPort(port);
-
- addExcludedPorts(port, options);
-
- options.setJmxPort(getManagementPort(port));
-
- //Set the log config file, relying on the log4j.configuration system property
- //set on the JVM by the JUnit runner task in module.xml.
- options.setLogConfigFile(new URL(System.getProperty("log4j.configuration")).getFile());
-
- Broker broker = new Broker();
- _logger.info("starting internal broker (same JVM)");
- broker.startup(options);
-
- _brokers.put(port, new InternalBrokerHolder(broker));
+
+ // create an in_VM broker
+ final ConfigurationFileApplicationRegistry registry = new ConfigurationFileApplicationRegistry(_configFile);
+ try
+ {
+ ApplicationRegistry.initialise(registry, port);
+ }
+ catch (Exception e)
+ {
+ _logger.error("Broker initialise failed due to:",e);
+ try
+ {
+ registry.close();
+ }
+ catch (Throwable closeE)
+ {
+ closeE.printStackTrace();
+ }
+ throw e;
+ }
+ TransportConnection.createVMBroker(port);
}
- else if (!_brokerType.equals(BrokerType.EXTERNAL))
+ else if (!_broker.equals(EXTERNAL))
{
String cmd = getBrokerCommand(port);
- _logger.info("starting external broker: " + cmd);
+ _logger.info("starting broker: " + cmd);
ProcessBuilder pb = new ProcessBuilder(cmd.split("\\s+"));
pb.redirectErrorStream(true);
@@ -509,7 +490,7 @@ public class QpidBrokerTestCase extends QpidTestCase
// DON'T change PNAME, qpid.stop needs this value.
env.put("QPID_PNAME", "-DPNAME=QPBRKR -DTNAME=\"" + _testName + "\"");
// Add the port to QPID_WORK to ensure unique working dirs for multi broker tests
- env.put("QPID_WORK", getQpidWork(_brokerType, port));
+ env.put("QPID_WORK", System.getProperty("QPID_WORK")+ "/" + port);
// Use the environment variable to set amqj.logging.level for the broker
@@ -561,7 +542,8 @@ public class QpidBrokerTestCase extends QpidTestCase
env.put("QPID_OPTS", QPID_OPTS);
}
}
- Process process = pb.start();;
+
+ process = pb.start();
Piper p = new Piper(process.getInputStream(),
_brokerOutputStream,
@@ -582,7 +564,6 @@ public class QpidBrokerTestCase extends QpidTestCase
try
{
- //test that the broker is still running and hasn't exited unexpectedly
int exit = process.exitValue();
_logger.info("broker aborted: " + exit);
cleanBroker();
@@ -590,58 +571,11 @@ public class QpidBrokerTestCase extends QpidTestCase
}
catch (IllegalThreadStateException e)
{
- // this is expect if the broker started successfully
+ // this is expect if the broker started succesfully
}
-
- _brokers.put(port, new SpawnedBrokerHolder(process));
}
- }
- private void addExcludedPorts(int port, BrokerOptions options)
- {
- final String protocolExcludesList = _brokerProtocolExcludes.replace("@PORT", "" + port);
-
- if (protocolExcludesList.equals(""))
- {
- return;
- }
- final String[] toks = protocolExcludesList.split("\\s");
-
- if(toks.length % 2 != 0)
- {
- throw new IllegalArgumentException("Must be an even number of tokens in '" + protocolExcludesList + "'");
- }
- for (int i = 0; i < toks.length; i=i+2)
- {
- String excludeArg = toks[i];
- final int excludedPort = Integer.parseInt(toks[i+1]);
- options.addExcludedPort(ProtocolExclusion.lookup(excludeArg), excludedPort);
-
- _logger.info("Adding protocol exclusion " + excludeArg + " " + excludedPort);
- }
- }
-
- private boolean existingInternalBroker()
- {
- for(BrokerHolder holder : _brokers.values())
- {
- if(holder instanceof InternalBrokerHolder)
- {
- return true;
- }
- }
-
- return false;
- }
-
- private String getQpidWork(BrokerType broker, int port)
- {
- if (!broker.equals(BrokerType.EXTERNAL))
- {
- return System.getProperty("QPID_WORK")+ "/" + port;
- }
-
- return System.getProperty("QPID_WORK");
+ _brokers.put(port, process);
}
public String getTestConfigFile()
@@ -722,17 +656,20 @@ public class QpidBrokerTestCase extends QpidTestCase
port = getPort(port);
_logger.info("stopping broker: " + getBrokerCommand(port));
- BrokerHolder broker = _brokers.remove(port);
- broker.shutdown();
+ Process process = _brokers.remove(port);
+ if (process != null)
+ {
+ process.destroy();
+ process.waitFor();
+ _logger.info("broker exited: " + process.exitValue());
+ }
+ else if (_broker.equals(VM))
+ {
+ TransportConnection.killVMBroker(port);
+ ApplicationRegistry.remove(port);
+ }
}
- public boolean isBrokerPresent(int port) throws Exception
- {
- port = getPort(port);
-
- return _brokers.containsKey(port);
- }
-
/**
* Attempt to set the Java Broker to use the BDBMessageStore for persistence
* Falling back to the DerbyMessageStore if
@@ -874,14 +811,20 @@ public class QpidBrokerTestCase extends QpidTestCase
}
/**
- * Set a System property for the client (and broker if using the same vm) of this test.
+ * Set a System (-D) property for the external Broker of this test.
*
* @param property The property to set
* @param value the value to set it to.
*/
protected void setTestClientSystemProperty(String property, String value)
{
- setTestSystemProperty(property, value);
+ if (!_propertiesSetForTestOnly.containsKey(property))
+ {
+ // Record the current value so we can revert it later.
+ _propertiesSetForTestOnly.put(property, System.getProperty(property));
+ }
+
+ System.setProperty(property, value);
}
/**
@@ -889,7 +832,20 @@ public class QpidBrokerTestCase extends QpidTestCase
*/
protected void revertSystemProperties()
{
- revertTestSystemProperties();
+ for (String key : _propertiesSetForTestOnly.keySet())
+ {
+ String value = _propertiesSetForTestOnly.get(key);
+ if (value != null)
+ {
+ System.setProperty(key, value);
+ }
+ else
+ {
+ System.clearProperty(key);
+ }
+ }
+
+ _propertiesSetForTestOnly.clear();
// We don't change the current VMs settings for Broker only properties
// so we can just clear this map
@@ -948,17 +904,17 @@ public class QpidBrokerTestCase extends QpidTestCase
*/
public boolean isBroker08()
{
- return _brokerVersion.equals(AmqpProtocolVersion.v0_8);
+ return _brokerVersion.equals(VERSION_08);
}
public boolean isBroker010()
{
- return _brokerVersion.equals(AmqpProtocolVersion.v0_10);
+ return _brokerVersion.equals(VERSION_010);
}
protected boolean isJavaBroker()
{
- return _brokerLanguage.equals("java") || _brokerType.equals("vm");
+ return _brokerLanguage.equals("java") || _broker.equals("vm");
}
protected boolean isCppBroker()
@@ -968,14 +924,9 @@ public class QpidBrokerTestCase extends QpidTestCase
protected boolean isExternalBroker()
{
- return !_brokerType.equals("vm"); //TODO
+ return !_broker.equals("vm");
}
-
- protected boolean isInternalBroker()
- {
- return _brokerType.equals(BrokerType.INTERNAL);
- }
-
+
protected boolean isBrokerStorePersistent()
{
return _brokerPersistent;
@@ -1047,6 +998,11 @@ public class QpidBrokerTestCase extends QpidTestCase
*/
public AMQConnectionFactory getConnectionFactory(String factoryName) throws NamingException
{
+ if (_broker.equals(VM))
+ {
+ factoryName += ".vm";
+ }
+
return (AMQConnectionFactory) getInitialContext().lookup(factoryName);
}
@@ -1087,7 +1043,15 @@ public class QpidBrokerTestCase extends QpidTestCase
public Connection getClientConnection(String username, String password, String id) throws JMSException, URLSyntaxException, AMQException, NamingException
{
_logger.info("get Connection");
- Connection con = getConnectionFactory().createConnection(username, password, id);
+ Connection con;
+ if (_broker.equals(VM))
+ {
+ con = new AMQConnection("vm://:1", username, password, id, "test");
+ }
+ else
+ {
+ con = getConnectionFactory().createConnection(username, password, id);
+ }
//add the connection in the lis of connections
_connections.add(con);
return con;
@@ -1125,8 +1089,7 @@ public class QpidBrokerTestCase extends QpidTestCase
c.close();
}
}
- finally
- {
+ finally{
// Ensure any problems with close does not interfer with property resets
revertSystemProperties();
revertLoggingLevels();
@@ -1227,8 +1190,7 @@ public class QpidBrokerTestCase extends QpidTestCase
MessageProducer producer = session.createProducer(destination);
- int i = offset;
- for (; i < (count + offset); i++)
+ for (int i = offset; i < (count + offset); i++)
{
Message next = createNextMessage(session, i);
@@ -1251,7 +1213,7 @@ public class QpidBrokerTestCase extends QpidTestCase
// we have no batchSize or
// our count is not divible by batchSize.
if (session.getTransacted() &&
- ( batchSize == 0 || (i-1) % batchSize != 0))
+ ( batchSize == 0 || count % batchSize != 0))
{
session.commit();
}
@@ -1346,26 +1308,29 @@ public class QpidBrokerTestCase extends QpidTestCase
*/
public void reloadBrokerSecurityConfig() throws Exception
{
- JMXTestUtils jmxu = new JMXTestUtils(this, "admin" , "admin");
- jmxu.open();
-
- try
+ if (_broker.equals(VM))
{
- ConfigurationManagement configMBean = jmxu.getConfigurationManagement();
- configMBean.reloadSecurityConfiguration();
+ ApplicationRegistry.getInstance().getConfiguration().reparseConfigFileSecuritySections();
}
- finally
+ else
{
- jmxu.close();
- }
-
- LogMonitor _monitor = new LogMonitor(_outputFile);
- assertTrue("The expected server security configuration reload did not occur",
- _monitor.waitForMessage(ServerConfiguration.SECURITY_CONFIG_RELOADED, LOGMONITOR_TIMEOUT));
- }
+ JMXTestUtils jmxu = new JMXTestUtils(this, "admin" , "admin");
+ jmxu.open();
+
+ try
+ {
+ ConfigurationManagement configMBean = jmxu.getConfigurationManagement();
+ configMBean.reloadSecurityConfiguration();
+ }
+ finally
+ {
+ jmxu.close();
+ }
+
+ LogMonitor _monitor = new LogMonitor(_outputFile);
+ assertTrue("The expected server security configuration reload did not occur",
+ _monitor.waitForMessage(ServerConfiguration.SECURITY_CONFIG_RELOADED, LOGMONITOR_TIMEOUT));
- protected int getFailingPort()
- {
- return FAILING_PORT;
+ }
}
}