diff options
Diffstat (limited to 'qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java')
-rw-r--r-- | qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java | 1363 |
1 files changed, 1363 insertions, 0 deletions
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java new file mode 100644 index 0000000000..6fcde7e185 --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java @@ -0,0 +1,1363 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.test.utils; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.LineNumberReader; +import java.io.PrintStream; +import java.net.MalformedURLException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MapMessage; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.ObjectMessage; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.StreamMessage; +import javax.jms.TextMessage; +import javax.naming.InitialContext; +import javax.naming.NamingException; + +import org.apache.commons.configuration.ConfigurationException; +import org.apache.commons.configuration.XMLConfiguration; +import org.apache.commons.lang.StringUtils; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQConnectionFactory; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.transport.TransportConnection; +import org.apache.qpid.exchange.ExchangeDefaults; +import org.apache.qpid.jms.BrokerDetails; +import org.apache.qpid.jms.ConnectionURL; +import org.apache.qpid.management.common.mbeans.ConfigurationManagement; +import org.apache.qpid.server.configuration.ServerConfiguration; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry; +import org.apache.qpid.server.store.DerbyMessageStore; +import org.apache.qpid.url.URLSyntaxException; +import org.apache.qpid.util.FileUtils; +import org.apache.qpid.util.LogMonitor; + +/** + * Qpid base class for system testing test cases. + */ +public class QpidBrokerTestCase extends QpidTestCase +{ + protected final String QpidHome = System.getProperty("QPID_HOME"); + protected File _configFile = new File(System.getProperty("broker.config")); + + protected static final Logger _logger = Logger.getLogger(QpidBrokerTestCase.class); + protected static final int LOGMONITOR_TIMEOUT = 5000; + + protected long RECEIVE_TIMEOUT = 1000l; + + private Map<String, String> _propertiesSetForTestOnly = new HashMap<String, String>(); + private Map<String, String> _propertiesSetForBroker = new HashMap<String, String>(); + private Map<Logger, Level> _loggerLevelSetForTest = new HashMap<Logger, Level>(); + + private XMLConfiguration _testConfiguration = new XMLConfiguration(); + private XMLConfiguration _testVirtualhosts = new XMLConfiguration(); + + protected static final String INDEX = "index"; + protected static final String CONTENT = "content"; + + private static final String DEFAULT_INITIAL_CONTEXT = "org.apache.qpid.jndi.PropertiesFileInitialContextFactory"; + + static + { + String initialContext = System.getProperty(InitialContext.INITIAL_CONTEXT_FACTORY); + + if (initialContext == null || initialContext.length() == 0) + { + System.setProperty(InitialContext.INITIAL_CONTEXT_FACTORY, DEFAULT_INITIAL_CONTEXT); + } + } + + // system properties + private static final String BROKER_LANGUAGE = "broker.language"; + private static final String BROKER = "broker"; + private static final String BROKER_CLEAN = "broker.clean"; + private static final String BROKER_CLEAN_BETWEEN_TESTS = "broker.clean.between.tests"; + private static final String BROKER_EXISTING_QPID_WORK = "broker.existing.qpid.work"; + private static final String BROKER_VERSION = "broker.version"; + protected static final String BROKER_READY = "broker.ready"; + private static final String BROKER_STOPPED = "broker.stopped"; + private static final String TEST_OUTPUT = "test.output"; + private static final String BROKER_LOG_INTERLEAVE = "broker.log.interleave"; + private static final String BROKER_LOG_PREFIX = "broker.log.prefix"; + private static final String BROKER_PERSITENT = "broker.persistent"; + + // values + protected static final String JAVA = "java"; + protected static final String CPP = "cpp"; + protected static final String VM = "vm"; + protected static final String EXTERNAL = "external"; + private static final String VERSION_08 = "0-8"; + private static final String VERSION_010 = "0-10"; + + protected static final String QPID_HOME = "QPID_HOME"; + + public static final int DEFAULT_VM_PORT = 1; + public static final int DEFAULT_PORT = Integer.getInteger("test.port", ServerConfiguration.DEFAULT_PORT); + public static final int DEFAULT_MANAGEMENT_PORT = Integer.getInteger("test.mport", ServerConfiguration.DEFAULT_JMXPORT); + public static final int DEFAULT_SSL_PORT = Integer.getInteger("test.sslport", ServerConfiguration.DEFAULT_SSL_PORT); + + protected String _brokerLanguage = System.getProperty(BROKER_LANGUAGE, JAVA); + protected String _broker = System.getProperty(BROKER, VM); + private String _brokerClean = System.getProperty(BROKER_CLEAN, null); + private Boolean _brokerCleanBetweenTests = Boolean.getBoolean(BROKER_CLEAN_BETWEEN_TESTS); + private String _brokerVersion = System.getProperty(BROKER_VERSION, VERSION_08); + protected String _output = System.getProperty(TEST_OUTPUT); + protected Boolean _brokerPersistent = Boolean.getBoolean(BROKER_PERSITENT); + + protected static String _brokerLogPrefix = System.getProperty(BROKER_LOG_PREFIX,"BROKER: "); + protected static boolean _interleaveBrokerLog = Boolean.getBoolean(BROKER_LOG_INTERLEAVE); + + protected File _outputFile; + + protected PrintStream _brokerOutputStream; + + protected Map<Integer, Process> _brokers = new HashMap<Integer, Process>(); + + protected InitialContext _initialContext; + protected AMQConnectionFactory _connectionFactory; + + protected String _testName; + + // the connections created for a given test + protected List<Connection> _connections = new ArrayList<Connection>(); + public static final String QUEUE = "queue"; + public static final String TOPIC = "topic"; + + /** Map to hold test defined environment properties */ + private Map<String, String> _env; + + /** Ensure our messages have some sort of size */ + protected static final int DEFAULT_MESSAGE_SIZE = 1024; + + /** Size to create our message*/ + private int _messageSize = DEFAULT_MESSAGE_SIZE; + /** Type of message*/ + protected enum MessageType + { + BYTES, + MAP, + OBJECT, + STREAM, + TEXT + } + private MessageType _messageType = MessageType.TEXT; + + public QpidBrokerTestCase(String name) + { + super(name); + } + + public QpidBrokerTestCase() + { + super(); + } + + public Logger getLogger() + { + return QpidBrokerTestCase._logger; + } + + public void runBare() throws Throwable + { + _testName = getClass().getSimpleName() + "." + getName(); + String qname = getClass().getName() + "." + getName(); + + // Initialize this for each test run + _env = new HashMap<String, String>(); + + PrintStream oldOut = System.out; + PrintStream oldErr = System.err; + PrintStream out = null; + PrintStream err = null; + + boolean redirected = _output != null && _output.length() > 0; + if (redirected) + { + _outputFile = new File(String.format("%s/TEST-%s.out", _output, qname)); + out = new PrintStream(_outputFile); + err = new PrintStream(String.format("%s/TEST-%s.err", _output, qname)); + System.setOut(out); + System.setErr(err); + + if (_interleaveBrokerLog) + { + _brokerOutputStream = out; + } + else + { + _brokerOutputStream = new PrintStream(new FileOutputStream(String + .format("%s/TEST-%s.broker.out", _output, qname)), true); + } + } + + _logger.info("========== start " + _testName + " =========="); + try + { + super.runBare(); + } + catch (Exception e) + { + _logger.error("exception", e); + throw e; + } + finally + { + try + { + stopBroker(); + } + catch (Exception e) + { + _logger.error("exception stopping broker", e); + } + + if(_brokerCleanBetweenTests) + { + try + { + cleanBroker(); + } + catch (Exception e) + { + _logger.error("exception cleaning up broker", e); + } + } + + _logger.info("========== stop " + _testName + " =========="); + + if (redirected) + { + System.setErr(oldErr); + System.setOut(oldOut); + err.close(); + out.close(); + if (!_interleaveBrokerLog) + { + _brokerOutputStream.close(); + } + } + } + } + + @Override + protected void setUp() throws Exception + { + if (!_configFile.exists()) + { + fail("Unable to test without config file:" + _configFile); + } + + String existingQpidWorkPath = System.getProperty(BROKER_EXISTING_QPID_WORK); + if(existingQpidWorkPath != null && !existingQpidWorkPath.equals("")) + { + cleanBroker(); + + File existing = new File(existingQpidWorkPath); + File qpidWork = new File(getQpidWork(_broker, getPort())); + FileUtils.copyRecursive(existing, qpidWork); + } + + startBroker(); + } + + private static final class Piper extends Thread + { + + private LineNumberReader in; + private PrintStream out; + private String ready; + private CountDownLatch latch; + private boolean seenReady; + private String stopped; + private String stopLine; + + public Piper(InputStream in, PrintStream out, String ready) + { + this(in, out, ready, null); + } + + public Piper(InputStream in, PrintStream out, String ready, String stopped) + { + this.in = new LineNumberReader(new InputStreamReader(in)); + this.out = out; + this.ready = ready; + this.stopped = stopped; + this.seenReady = false; + + if (this.ready != null && !this.ready.equals("")) + { + this.latch = new CountDownLatch(1); + } + else + { + this.latch = null; + } + } + + public Piper(InputStream in, PrintStream out) + { + this(in, out, null); + } + + public boolean await(long timeout, TimeUnit unit) throws InterruptedException + { + if (latch == null) + { + return true; + } + else + { + latch.await(timeout, unit); + return seenReady; + } + } + + public void run() + { + try + { + String line; + while ((line = in.readLine()) != null) + { + if (_interleaveBrokerLog) + { + line = _brokerLogPrefix + line; + } + out.println(line); + + if (latch != null && line.contains(ready)) + { + seenReady = true; + latch.countDown(); + } + + if (!seenReady && line.contains(stopped)) + { + stopLine = line; + } + } + } + catch (IOException e) + { + // this seems to happen regularly even when + // exits are normal + } + finally + { + if (latch != null) + { + latch.countDown(); + } + } + } + + public String getStopLine() + { + return stopLine; + } + } + + public void startBroker() throws Exception + { + startBroker(0); + } + + /** + * Return the management portin use by the broker on this main port + * + * @param mainPort the broker's main port. + * + * @return the management port that corresponds to the broker on the given port + */ + protected int getManagementPort(int mainPort) + { + return mainPort + (DEFAULT_MANAGEMENT_PORT - (_broker.equals(VM) ? DEFAULT_VM_PORT : DEFAULT_PORT)); + } + + /** + * Get the Port that is use by the current broker + * + * @return the current port + */ + protected int getPort() + { + return getPort(0); + } + + protected int getPort(int port) + { + if (_broker.equals(VM)) + { + return port == 0 ? DEFAULT_VM_PORT : port; + } + else if (!_broker.equals(EXTERNAL)) + { + return port == 0 ? DEFAULT_PORT : port; + } + else + { + return port; + } + } + + protected String getBrokerCommand(int port) throws MalformedURLException + { + return _broker + .replace("@PORT", "" + port) + .replace("@SSL_PORT", "" + (port - 1)) + .replace("@MPORT", "" + getManagementPort(port)) + .replace("@CONFIG_FILE", _configFile.toString()); + } + + public void startBroker(int port) throws Exception + { + port = getPort(port); + + // Save any configuration changes that have been made + saveTestConfiguration(); + saveTestVirtualhosts(); + + Process process = null; + if (_broker.equals(VM)) + { + setConfigurationProperty("management.jmxport", String.valueOf(getManagementPort(port))); + setConfigurationProperty(ServerConfiguration.MGMT_CUSTOM_REGISTRY_SOCKET, String.valueOf(false)); + saveTestConfiguration(); + + // create an in_VM broker + final ConfigurationFileApplicationRegistry registry = new ConfigurationFileApplicationRegistry(_configFile); + try + { + ApplicationRegistry.initialise(registry, port); + } + catch (Exception e) + { + _logger.error("Broker initialise failed due to:",e); + try + { + registry.close(); + } + catch (Throwable closeE) + { + closeE.printStackTrace(); + } + throw e; + } + TransportConnection.createVMBroker(port); + } + else if (!_broker.equals(EXTERNAL)) + { + String cmd = getBrokerCommand(port); + _logger.info("starting broker: " + cmd); + ProcessBuilder pb = new ProcessBuilder(cmd.split("\\s+")); + pb.redirectErrorStream(true); + + Map<String, String> env = pb.environment(); + + String qpidHome = System.getProperty(QPID_HOME); + env.put(QPID_HOME, qpidHome); + + //Augment Path with bin directory in QPID_HOME. + env.put("PATH", env.get("PATH").concat(File.pathSeparator + qpidHome + "/bin")); + + //Add the test name to the broker run. + // DON'T change PNAME, qpid.stop needs this value. + env.put("QPID_PNAME", "-DPNAME=QPBRKR -DTNAME=\"" + _testName + "\""); + // Add the port to QPID_WORK to ensure unique working dirs for multi broker tests + env.put("QPID_WORK", getQpidWork(_broker, port)); + + + // Use the environment variable to set amqj.logging.level for the broker + // The value used is a 'server' value in the test configuration to + // allow a differentiation between the client and broker logging levels. + if (System.getProperty("amqj.server.logging.level") != null) + { + setBrokerEnvironment("AMQJ_LOGGING_LEVEL", System.getProperty("amqj.server.logging.level")); + } + + // Add all the environment settings the test requested + if (!_env.isEmpty()) + { + for (Map.Entry<String, String> entry : _env.entrySet()) + { + env.put(entry.getKey(), entry.getValue()); + } + } + + + // Add default test logging levels that are used by the log4j-test + // Use the convenience methods to push the current logging setting + // in to the external broker's QPID_OPTS string. + if (System.getProperty("amqj.protocol.logging.level") != null) + { + setSystemProperty("amqj.protocol.logging.level"); + } + if (System.getProperty("root.logging.level") != null) + { + setSystemProperty("root.logging.level"); + } + + + String QPID_OPTS = " "; + // Add all the specified system properties to QPID_OPTS + if (!_propertiesSetForBroker.isEmpty()) + { + for (String key : _propertiesSetForBroker.keySet()) + { + QPID_OPTS += "-D" + key + "=" + _propertiesSetForBroker.get(key) + " "; + } + + if (env.containsKey("QPID_OPTS")) + { + env.put("QPID_OPTS", env.get("QPID_OPTS") + QPID_OPTS); + } + else + { + env.put("QPID_OPTS", QPID_OPTS); + } + } + + process = pb.start(); + + Piper p = new Piper(process.getInputStream(), + _brokerOutputStream, + System.getProperty(BROKER_READY), + System.getProperty(BROKER_STOPPED)); + + p.start(); + + if (!p.await(30, TimeUnit.SECONDS)) + { + _logger.info("broker failed to become ready (" + p.ready + "):" + p.getStopLine()); + //Ensure broker has stopped + process.destroy(); + cleanBroker(); + throw new RuntimeException("broker failed to become ready:" + + p.getStopLine()); + } + + try + { + int exit = process.exitValue(); + _logger.info("broker aborted: " + exit); + cleanBroker(); + throw new RuntimeException("broker aborted: " + exit); + } + catch (IllegalThreadStateException e) + { + // this is expect if the broker started succesfully + } + } + + _brokers.put(port, process); + } + + private String getQpidWork(String broker, int port) + { + if (broker.equals(VM)) + { + return System.getProperty("QPID_WORK"); + } + else if (!broker.equals(EXTERNAL)) + { + return System.getProperty("QPID_WORK")+ "/" + port; + } + + return System.getProperty("QPID_WORK"); + } + + public String getTestConfigFile() + { + String path = _output == null ? System.getProperty("java.io.tmpdir") : _output; + return path + "/" + getTestQueueName() + "-config.xml"; + } + + public String getTestVirtualhostsFile() + { + String path = _output == null ? System.getProperty("java.io.tmpdir") : _output; + return path + "/" + getTestQueueName() + "-virtualhosts.xml"; + } + + protected void saveTestConfiguration() throws ConfigurationException + { + // Specifiy the test config file + String testConfig = getTestConfigFile(); + setSystemProperty("test.config", testConfig); + + // Create the file if configuration does not exist + if (_testConfiguration.isEmpty()) + { + _testConfiguration.addProperty("__ignore", "true"); + } + _testConfiguration.save(testConfig); + } + + protected void saveTestVirtualhosts() throws ConfigurationException + { + // Specifiy the test virtualhosts file + String testVirtualhosts = getTestVirtualhostsFile(); + setSystemProperty("test.virtualhosts", testVirtualhosts); + + // Create the file if configuration does not exist + if (_testVirtualhosts.isEmpty()) + { + _testVirtualhosts.addProperty("__ignore", "true"); + } + _testVirtualhosts.save(testVirtualhosts); + } + + public void cleanBroker() + { + if (_brokerClean != null) + { + _logger.info("clean: " + _brokerClean); + + try + { + ProcessBuilder pb = new ProcessBuilder(_brokerClean.split("\\s+")); + pb.redirectErrorStream(true); + Process clean = pb.start(); + new Piper(clean.getInputStream(),_brokerOutputStream).start(); + + clean.waitFor(); + + _logger.info("clean exited: " + clean.exitValue()); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + catch (InterruptedException e) + { + throw new RuntimeException(e); + } + } + } + + public void stopBroker() throws Exception + { + stopBroker(0); + } + + public void stopBroker(int port) throws Exception + { + port = getPort(port); + + _logger.info("stopping broker: " + getBrokerCommand(port)); + Process process = _brokers.remove(port); + if (process != null) + { + process.destroy(); + process.waitFor(); + _logger.info("broker exited: " + process.exitValue()); + } + else if (_broker.equals(VM)) + { + TransportConnection.killVMBroker(port); + ApplicationRegistry.remove(port); + } + } + + /** + * Attempt to set the Java Broker to use the BDBMessageStore for persistence + * Falling back to the DerbyMessageStore if + * + * @param virtualhost - The virtualhost to modify + * + * @throws ConfigurationException - when reading/writing existing configuration + * @throws IOException - When creating a temporary file. + */ + protected void makeVirtualHostPersistent(String virtualhost) + throws ConfigurationException, IOException + { + Class<?> storeClass = null; + try + { + // Try and lookup the BDB class + storeClass = Class.forName("org.apache.qpid.server.store.berkeleydb.BDBMessageStore"); + } + catch (ClassNotFoundException e) + { + // No BDB store, we'll use Derby instead. + storeClass = DerbyMessageStore.class; + } + + + setConfigurationProperty("virtualhosts.virtualhost." + virtualhost + ".store.class", + storeClass.getName()); + setConfigurationProperty("virtualhosts.virtualhost." + virtualhost + ".store." + DerbyMessageStore.ENVIRONMENT_PATH_PROPERTY, + "${QPID_WORK}/" + virtualhost); + } + + /** + * Get a property value from the current configuration file. + * + * @param property the property to lookup + * + * @return the requested String Value + * + * @throws org.apache.commons.configuration.ConfigurationException + * + */ + protected String getConfigurationStringProperty(String property) throws ConfigurationException + { + // Call save Configuration to be sure we have saved the test specific + // file. As the optional status + saveTestConfiguration(); + saveTestVirtualhosts(); + + ServerConfiguration configuration = new ServerConfiguration(_configFile); + // Don't need to configuration.configure() here as we are just pulling + // values directly by String. + return configuration.getConfig().getString(property); + } + + /** + * Set a configuration Property for this test run. + * + * This creates a new configuration based on the current configuration + * with the specified property change. + * + * Multiple calls to this method will result in multiple temporary + * configuration files being created. + * + * @param property the configuration property to set + * @param value the new value + * + * @throws ConfigurationException when loading the current config file + * @throws IOException when writing the new config file + */ + protected void setConfigurationProperty(String property, String value) + throws ConfigurationException, IOException + { + // Choose which file to write the property to based on prefix. + if (property.startsWith("virtualhosts")) + { + _testVirtualhosts.setProperty(StringUtils.substringAfter(property, "virtualhosts."), value); + } + else + { + _testConfiguration.setProperty(property, value); + } + } + + /** + * Set a System property that is to be applied only to the external test + * broker. + * + * This is a convenience method to enable the setting of a -Dproperty=value + * entry in QPID_OPTS + * + * This is only useful for the External Java Broker tests. + * + * @param property the property name + * @param value the value to set the property to + */ + protected void setBrokerOnlySystemProperty(String property, String value) + { + if (!_propertiesSetForBroker.containsKey(property)) + { + _propertiesSetForBroker.put(property, value); + } + + } + + /** + * Set a System (-D) property for this test run. + * + * This convenience method copies the current VMs System Property + * for the external VM Broker. + * + * @param property the System property to set + */ + protected void setSystemProperty(String property) + { + setSystemProperty(property, System.getProperty(property)); + } + + /** + * Set a System property for the duration of this test. + * + * When the test run is complete the value will be reverted. + * + * The values set using this method will also be propogated to the external + * Java Broker via a -D value defined in QPID_OPTS. + * + * If the value should not be set on the broker then use + * setTestClientSystemProperty(). + * + * @param property the property to set + * @param value the new value to use + */ + protected void setSystemProperty(String property, String value) + { + // Record the value for the external broker + _propertiesSetForBroker.put(property, value); + + //Set the value for the test client vm aswell. + setTestClientSystemProperty(property, value); + } + + /** + * Set a System (-D) property for the external Broker of this test. + * + * @param property The property to set + * @param value the value to set it to. + */ + protected void setTestClientSystemProperty(String property, String value) + { + if (!_propertiesSetForTestOnly.containsKey(property)) + { + // Record the current value so we can revert it later. + _propertiesSetForTestOnly.put(property, System.getProperty(property)); + } + + System.setProperty(property, value); + } + + /** + * Restore the System property values that were set before this test run. + */ + protected void revertSystemProperties() + { + for (String key : _propertiesSetForTestOnly.keySet()) + { + String value = _propertiesSetForTestOnly.get(key); + if (value != null) + { + System.setProperty(key, value); + } + else + { + System.clearProperty(key); + } + } + + _propertiesSetForTestOnly.clear(); + + // We don't change the current VMs settings for Broker only properties + // so we can just clear this map + _propertiesSetForBroker.clear(); + } + + /** + * Add an environtmen variable for the external broker environment + * + * @param property the property to set + * @param value the value to set it to + */ + protected void setBrokerEnvironment(String property, String value) + { + _env.put(property, value); + } + + /** + * Adjust the VMs Log4j Settings just for this test run + * + * @param logger the logger to change + * @param level the level to set + */ + protected void setLoggerLevel(Logger logger, Level level) + { + assertNotNull("Cannot set level of null logger", logger); + assertNotNull("Cannot set Logger("+logger.getName()+") to null level.",level); + + if (!_loggerLevelSetForTest.containsKey(logger)) + { + // Record the current value so we can revert it later. + _loggerLevelSetForTest.put(logger, logger.getLevel()); + } + + logger.setLevel(level); + } + + /** + * Restore the logging levels defined by this test. + */ + protected void revertLoggingLevels() + { + for (Logger logger : _loggerLevelSetForTest.keySet()) + { + logger.setLevel(_loggerLevelSetForTest.get(logger)); + } + + _loggerLevelSetForTest.clear(); + + } + + /** + * Check whether the broker is an 0.8 + * + * @return true if the broker is an 0_8 version, false otherwise. + */ + public boolean isBroker08() + { + return _brokerVersion.equals(VERSION_08); + } + + public boolean isBroker010() + { + return _brokerVersion.equals(VERSION_010); + } + + protected boolean isJavaBroker() + { + return _brokerLanguage.equals("java") || _broker.equals("vm"); + } + + protected boolean isCppBroker() + { + return _brokerLanguage.equals("cpp"); + } + + protected boolean isExternalBroker() + { + return !_broker.equals("vm"); + } + + protected boolean isBrokerStorePersistent() + { + return _brokerPersistent; + } + + public void restartBroker() throws Exception + { + restartBroker(0); + } + + public void restartBroker(int port) throws Exception + { + stopBroker(port); + startBroker(port); + } + + /** + * we assume that the environment is correctly set + * i.e. -Djava.naming.provider.url="..//example010.properties" + * TODO should be a way of setting that through maven + * + * @return an initial context + * + * @throws NamingException if there is an error getting the context + */ + public InitialContext getInitialContext() throws NamingException + { + _logger.info("get InitialContext"); + if (_initialContext == null) + { + _initialContext = new InitialContext(); + } + return _initialContext; + } + + /** + * Get the default connection factory for the currently used broker + * Default factory is "local" + * + * @return A conection factory + * + * @throws Exception if there is an error getting the tactory + */ + public AMQConnectionFactory getConnectionFactory() throws NamingException + { + _logger.info("get ConnectionFactory"); + if (_connectionFactory == null) + { + if (Boolean.getBoolean("profile.use_ssl")) + { + _connectionFactory = getConnectionFactory("default.ssl"); + } + else + { + _connectionFactory = getConnectionFactory("default"); + } + } + return _connectionFactory; + } + + /** + * Get a connection factory for the currently used broker + * + * @param factoryName The factory name + * + * @return A conection factory + * + * @throws Exception if there is an error getting the tactory + */ + public AMQConnectionFactory getConnectionFactory(String factoryName) throws NamingException + { + if (_broker.equals(VM)) + { + factoryName += ".vm"; + } + + return (AMQConnectionFactory) getInitialContext().lookup(factoryName); + } + + public Connection getConnection() throws JMSException, NamingException + { + return getConnection("guest", "guest"); + } + + public Connection getConnection(ConnectionURL url) throws JMSException + { + _logger.info(url.getURL()); + Connection connection = new AMQConnectionFactory(url).createConnection(url.getUsername(), url.getPassword()); + + _connections.add(connection); + + return connection; + } + + /** + * Get a connection (remote or in-VM) + * + * @param username The user name + * @param password The user password + * + * @return a newly created connection + * + * @throws Exception if there is an error getting the connection + */ + public Connection getConnection(String username, String password) throws JMSException, NamingException + { + _logger.info("get connection"); + Connection con = getConnectionFactory().createConnection(username, password); + //add the connection in the lis of connections + _connections.add(con); + return con; + } + + public Connection getClientConnection(String username, String password, String id) throws JMSException, URLSyntaxException, AMQException, NamingException + { + _logger.info("get Connection"); + Connection con; + if (_broker.equals(VM)) + { + con = new AMQConnection("vm://:1", username, password, id, "test"); + } + else + { + con = getConnectionFactory().createConnection(username, password, id); + } + //add the connection in the lis of connections + _connections.add(con); + return con; + } + + /** + * Return a uniqueName for this test. + * In this case it returns a queue Named by the TestCase and TestName + * + * @return String name for a queue + */ + protected String getTestQueueName() + { + return getClass().getSimpleName() + "-" + getName(); + } + + /** + * Return a Queue specific for this test. + * Uses getTestQueueName() as the name of the queue + * @return + */ + public Queue getTestQueue() + { + return new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, getTestQueueName()); + } + + + protected void tearDown() throws java.lang.Exception + { + try + { + // close all the connections used by this test. + for (Connection c : _connections) + { + c.close(); + } + } + finally{ + // Ensure any problems with close does not interfer with property resets + revertSystemProperties(); + revertLoggingLevels(); + } + } + + /** + * Consume all the messages in the specified queue. Helper to ensure + * persistent tests don't leave data behind. + * + * @param queue the queue to purge + * + * @return the count of messages drained + * + * @throws Exception if a problem occurs + */ + protected int drainQueue(Queue queue) throws Exception + { + Connection connection = getConnection(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + MessageConsumer consumer = session.createConsumer(queue); + + connection.start(); + + int count = 0; + while (consumer.receive(1000) != null) + { + count++; + } + + connection.close(); + + return count; + } + + /** + * Send messages to the given destination. + * + * If session is transacted then messages will be commited before returning + * + * @param session the session to use for sending + * @param destination where to send them to + * @param count no. of messages to send + * + * @return the sent messges + * + * @throws Exception + */ + public List<Message> sendMessage(Session session, Destination destination, + int count) throws Exception + { + return sendMessage(session, destination, count, 0, 0); + } + + /** + * Send messages to the given destination. + * + * If session is transacted then messages will be commited before returning + * + * @param session the session to use for sending + * @param destination where to send them to + * @param count no. of messages to send + * + * @param batchSize the batchSize in which to commit, 0 means no batching, + * but a single commit at the end + * @return the sent messgse + * + * @throws Exception + */ + public List<Message> sendMessage(Session session, Destination destination, + int count, int batchSize) throws Exception + { + return sendMessage(session, destination, count, 0, batchSize); + } + + /** + * Send messages to the given destination. + * + * If session is transacted then messages will be commited before returning + * + * @param session the session to use for sending + * @param destination where to send them to + * @param count no. of messages to send + * + * @param offset offset allows the INDEX value of the message to be adjusted. + * @param batchSize the batchSize in which to commit, 0 means no batching, + * but a single commit at the end + * @return the sent messgse + * + * @throws Exception + */ + public List<Message> sendMessage(Session session, Destination destination, + int count, int offset, int batchSize) throws Exception + { + List<Message> messages = new ArrayList<Message>(count); + + MessageProducer producer = session.createProducer(destination); + + int i = offset; + for (; i < (count + offset); i++) + { + Message next = createNextMessage(session, i); + + producer.send(next); + + if (session.getTransacted() && batchSize > 0) + { + if (i % batchSize == 0) + { + session.commit(); + } + + } + + messages.add(next); + } + + // Ensure we commit the last messages + // Commit the session if we are transacted and + // we have no batchSize or + // our count is not divible by batchSize. + if (session.getTransacted() && + ( batchSize == 0 || (i-1) % batchSize != 0)) + { + session.commit(); + } + + return messages; + } + + public Message createNextMessage(Session session, int msgCount) throws JMSException + { + Message message = createMessage(session, _messageSize); + message.setIntProperty(INDEX, msgCount); + + return message; + + } + + public Message createMessage(Session session, int messageSize) throws JMSException + { + String payload = new String(new byte[messageSize]); + + Message message; + + switch (_messageType) + { + case BYTES: + message = session.createBytesMessage(); + ((BytesMessage) message).writeUTF(payload); + break; + case MAP: + message = session.createMapMessage(); + ((MapMessage) message).setString(CONTENT, payload); + break; + default: // To keep the compiler happy + case TEXT: + message = session.createTextMessage(); + ((TextMessage) message).setText(payload); + break; + case OBJECT: + message = session.createObjectMessage(); + ((ObjectMessage) message).setObject(payload); + break; + case STREAM: + message = session.createStreamMessage(); + ((StreamMessage) message).writeString(payload); + break; + } + + return message; + } + + protected int getMessageSize() + { + return _messageSize; + } + + protected void setMessageSize(int byteSize) + { + _messageSize = byteSize; + } + + public ConnectionURL getConnectionURL() throws NamingException + { + return getConnectionFactory().getConnectionURL(); + } + + public BrokerDetails getBroker() + { + try + { + if (getConnectionFactory().getConnectionURL().getBrokerCount() > 0) + { + return getConnectionFactory().getConnectionURL().getBrokerDetails(0); + } + else + { + fail("No broker details are available."); + } + } + catch (NamingException e) + { + fail(e.getMessage()); + } + + //keep compiler happy + return null; + } + + /** + * Reloads the broker security configuration using the ApplicationRegistry (InVM brokers) or the + * ConfigurationManagementMBean via the JMX interface (Standalone brokers, management must be + * enabled before calling the method). + */ + public void reloadBrokerSecurityConfig() throws Exception + { + if (_broker.equals(VM)) + { + ApplicationRegistry.getInstance().getConfiguration().reparseConfigFileSecuritySections(); + } + else + { + JMXTestUtils jmxu = new JMXTestUtils(this, "admin" , "admin"); + jmxu.open(); + + try + { + ConfigurationManagement configMBean = jmxu.getConfigurationManagement(); + configMBean.reloadSecurityConfiguration(); + } + finally + { + jmxu.close(); + } + + LogMonitor _monitor = new LogMonitor(_outputFile); + assertTrue("The expected server security configuration reload did not occur", + _monitor.waitForMessage(ServerConfiguration.SECURITY_CONFIG_RELOADED, LOGMONITOR_TIMEOUT)); + + } + } +} |