diff options
author | Andrew Donald Kennedy <grkvlt@apache.org> | 2011-03-03 01:49:08 +0000 |
---|---|---|
committer | Andrew Donald Kennedy <grkvlt@apache.org> | 2011-03-03 01:49:08 +0000 |
commit | e0a7ba7e5f7e6795a083b2219e295293be5714a8 (patch) | |
tree | 1aa2950ce9209fbc6ed65592fef98e78ec9ada9e | |
parent | ffb25827f64be630818220eb267f95d38af75ca7 (diff) | |
download | qpid-python-e0a7ba7e5f7e6795a083b2219e295293be5714a8.tar.gz |
QPID-2811, QPID-2812, QPID-2813, QPID-2814, QPID-2816: Network layer refactoring
* Refactor Qpid network layer to share common interfaces between client and broker
* Added VM networking using Mina for 0-10 protocol
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/grkvlt-network-20110301@1076489 13f79535-47bb-0310-9956-ffa450edef68
93 files changed, 2659 insertions, 4571 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java index a6f319cb1f..3a562726a9 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java @@ -1203,7 +1203,7 @@ public class QMFService implements ConfigStore.ConfigEventListener, Closeable public String getAddress() { - return _obj.getAddress(); + return _obj.getRemoteAddress().toString(); } public Boolean getIncoming() diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 4f86c82578..b9c558fc3f 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -1395,7 +1395,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel public String getSessionName() { - return getConnectionConfig().getAddress() + "/" + getChannelId(); + return getConnectionConfig().getRemoteAddress().toString() + "/" + getChannelId(); } public long getCreateTime() diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/BrokerInstance.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/BrokerInstance.java new file mode 100644 index 0000000000..7ff4c8b3c7 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/BrokerInstance.java @@ -0,0 +1,350 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server; + +import java.io.File; +import java.io.InputStream; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.util.EnumSet; +import java.util.HashSet; +import java.util.List; +import java.util.Properties; +import java.util.Set; + +import org.apache.log4j.Logger; +import org.apache.log4j.PropertyConfigurator; +import org.apache.log4j.xml.QpidLog4JConfigurator; +import org.apache.qpid.BrokerOptions; +import org.apache.qpid.common.QpidProperties; +import org.apache.qpid.protocol.ReceiverFactory; +import org.apache.qpid.server.configuration.ServerConfiguration; +import org.apache.qpid.server.configuration.management.ConfigurationManagementMBean; +import org.apache.qpid.server.information.management.ServerInformationMBean; +import org.apache.qpid.server.logging.SystemOutMessageLogger; +import org.apache.qpid.server.logging.actors.BrokerActor; +import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.actors.GenericActor; +import org.apache.qpid.server.logging.management.LoggingManagementMBean; +import org.apache.qpid.server.logging.messages.BrokerMessages; +import org.apache.qpid.server.protocol.BrokerReceiverFactory; +import org.apache.qpid.server.protocol.BrokerReceiverFactory.VERSION; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry; +import org.apache.qpid.ssl.SSLContextFactory; +import org.apache.qpid.transport.ConnectionSettings; +import org.apache.qpid.transport.network.IncomingNetworkTransport; +import org.apache.qpid.transport.network.NetworkTransport; +import org.apache.qpid.transport.network.Transport; + +public class BrokerInstance +{ + private static Logger _logger; + + public class InitException extends Exception + { + InitException(String msg, Throwable cause) + { + super(msg, cause); + } + + InitException(String msg) + { + super(msg); + } + } + + public void shutdown() + { + ApplicationRegistry.remove(); + } + + public void startup(BrokerOptions options) throws Exception + { + //if the -Dlog4j.configuration property has not been set, enable the init override + //to stop Log4J wondering off and picking up the first log4j.xml/properties file it + //finds from the classpath when we get the first Loggers + if (System.getProperty("log4j.configuration") == null) + { + System.setProperty("log4j.defaultInitOverride", "true"); + } + _logger = Logger.getLogger(BrokerInstance.class); + CurrentActor.set(new BrokerActor(new SystemOutMessageLogger())); + + String home = System.getProperty(BrokerOptions.QPID_HOME); + File defaultConfigFile = new File(home, BrokerOptions.DEFAULT_CONFIG_FILE); + File configFile = new File(options.getValue(BrokerOptions.CONFIG, defaultConfigFile.getPath())); + if (!configFile.exists()) + { + String error = "File " + configFile + " could not be found. Check the file exists and is readable."; + if (home == null) + { + error = error + "\nNote: " + BrokerOptions.QPID_HOME + " is not set."; + } + + throw new InitException(error, null); + } + else + { + CurrentActor.get().message(BrokerMessages.CONFIG(configFile.getAbsolutePath())); + } + + + String watch = options.getValue(BrokerOptions.WATCH); + int logWatchTime = 0; + try + { + logWatchTime = Integer.parseInt(watch); + } + catch (NumberFormatException e) + { + System.err.println("Log watch configuration value of " + watch + " is invalid. Must be " + + "a non-negative integer. Using default of zero (no watching configured"); + } + + String log4j = options.getValue(BrokerOptions.LOG4J, System.getProperty("log4j.configuration")); + File logConfigFile; + if (log4j != null) + { + logConfigFile = new File(log4j); + configureLogging(logConfigFile, logWatchTime); + } + else + { + File configFileDirectory = configFile.getParentFile(); + logConfigFile = new File(configFileDirectory, BrokerOptions.DEFAULT_LOG_CONFIG_FILENAME); + configureLogging(logConfigFile, logWatchTime); + } + + ConfigurationFileApplicationRegistry config = new ConfigurationFileApplicationRegistry(configFile); + ServerConfiguration serverConfig = config.getConfiguration(); + + String management = options.getValue(BrokerOptions.MANAGEMENT); + updateManagementPort(serverConfig, management); + + // Application registry initialise + ApplicationRegistry.initialise(config); + + // We have already loaded the BrokerMessages class by this point so we + // need to refresh the locale setting in case we had a different value in + // the configuration. + BrokerMessages.reload(); + + // AR.initialise() sets and removes its own actor so we now need to set the actor + // for the remainder of the startup, and the default actor if the stack is empty + CurrentActor.set(new BrokerActor(config.getCompositeStartupMessageLogger())); + CurrentActor.setDefault(new BrokerActor(config.getRootMessageLogger())); + GenericActor.setDefaultMessageLogger(config.getRootMessageLogger()); + + try + { + configureLoggingManagementMBean(logConfigFile, logWatchTime); + + ConfigurationManagementMBean configMBean = new ConfigurationManagementMBean(); + configMBean.register(); + + ServerInformationMBean sysInfoMBean = new ServerInformationMBean(QpidProperties.BUILD_VERSION_PROPERTY, QpidProperties.RELEASE_VERSION_PROPERTY); + sysInfoMBean.register(); + + Set<Integer> ports = new HashSet<Integer>(); + Set<Integer> exclude_0_10 = new HashSet<Integer>(); + Set<Integer> exclude_0_9_1 = new HashSet<Integer>(); + Set<Integer> exclude_0_9 = new HashSet<Integer>(); + Set<Integer> exclude_0_8 = new HashSet<Integer>(); + parsePortList(ports, options.get(BrokerOptions.PORTS, serverConfig.getPorts())); + parsePortList(exclude_0_10, options.get(BrokerOptions.EXCLUDE_0_10, serverConfig.getPortExclude010())); + parsePortList(exclude_0_9_1, options.get(BrokerOptions.EXCLUDE_0_9_1, serverConfig.getPortExclude091())); + parsePortList(exclude_0_9, options.get(BrokerOptions.EXCLUDE_0_9, serverConfig.getPortExclude09())); + parsePortList(exclude_0_8, options.get(BrokerOptions.EXCLUDE_0_8, serverConfig.getPortExclude08())); + + String protocol = options.getValue(BrokerOptions.PROTOCOL, "tcp"); + String bind = options.getValue(BrokerOptions.BIND); + if (bind == null) + { + bind = serverConfig.getBind(); + } + InetAddress address = null; + + if (bind.equals("*")) + { + address = new InetSocketAddress(0).getAddress(); + } + else + { + address = InetAddress.getByName(bind); + } + String host = address.getCanonicalHostName(); + + ConnectionSettings settings = new ConnectionSettings(); + settings.setProtocol(protocol); + settings.setHost(bind); + + String keystorePath = serverConfig.getKeystorePath(); + String keystorePassword = serverConfig.getKeystorePassword(); + String certType = serverConfig.getCertType(); + SSLContextFactory sslFactory = null; + + if (!serverConfig.getSSLOnly()) + { + for (int port : ports) + { + IncomingNetworkTransport transport = Transport.getIncomingTransport(); + + Set<VERSION> supported = EnumSet.allOf(VERSION.class); + if (exclude_0_10.contains(port)) + { + supported.remove(VERSION.v0_10); + } + if (exclude_0_9_1.contains(port)) + { + supported.remove(VERSION.v0_9_1); + } + if (exclude_0_9.contains(port)) + { + supported.remove(VERSION.v0_9); + } + if (exclude_0_8.contains(port)) + { + supported.remove(VERSION.v0_8); + } + + settings.setPort(port); + + ReceiverFactory factory = new BrokerReceiverFactory(host, supported); + transport.accept(settings, factory, sslFactory); + + ApplicationRegistry.getInstance().registerTransport(port, transport); + CurrentActor.get().message(BrokerMessages.LISTENING(protocol.toUpperCase(), port)); + } + } + + if (serverConfig.getEnableSSL()) + { + IncomingNetworkTransport transport = Transport.getIncomingTransport(); + sslFactory = new SSLContextFactory(keystorePath, keystorePassword, certType); + settings.setPort(serverConfig.getSSLPort()); + + ReceiverFactory factory = new BrokerReceiverFactory(host, EnumSet.allOf(VERSION.class)); + transport.accept(settings, factory, sslFactory); + + ApplicationRegistry.getInstance().registerTransport(serverConfig.getSSLPort(), transport); + CurrentActor.get().message(BrokerMessages.LISTENING(protocol.toUpperCase() + "/SSL", serverConfig.getSSLPort())); + } + + CurrentActor.get().message(BrokerMessages.READY()); + } + finally + { + // Startup is complete so remove the AR initialised Startup actor + CurrentActor.remove(); + } + } + + + private void configureLogging(File logConfigFile, int logWatchTime) throws Exception + { + if (logConfigFile.exists() && logConfigFile.canRead()) + { + CurrentActor.get().message(BrokerMessages.LOG_CONFIG(logConfigFile.getAbsolutePath())); + + try + { + if (logWatchTime > 0) + { + System.out.println("log file " + logConfigFile.getAbsolutePath() + " will be checked for changes every " + + logWatchTime + " seconds"); + // log4j expects the watch interval in milliseconds + QpidLog4JConfigurator.configureAndWatch(logConfigFile.getPath(), logWatchTime * 1000); + } + else + { + QpidLog4JConfigurator.configure(logConfigFile.getPath()); + } + } + catch (Exception e) + { + throw new InitException(e.getMessage(), e); + } + } + else + { + System.err.println("Logging configuration error: unable to read file " + logConfigFile.getAbsolutePath()); + System.err.println("Using the fallback internal log4j.properties configuration"); + + InputStream propsFile = this.getClass().getResourceAsStream("/log4j.properties"); + if (propsFile == null) + { + throw new InitException("Unable to load the fallback internal log4j.properties configuration file"); + } + else + { + Properties fallbackProps = new Properties(); + fallbackProps.load(propsFile); + PropertyConfigurator.configure(fallbackProps); + } + } + } + + private void parsePortList(Set<Integer> output, List<String> input) throws InitException + { + if (input != null) + { + for (String port : input) + { + try + { + output.add(Integer.parseInt(String.valueOf(port))); + } + catch (NumberFormatException e) + { + throw new InitException("Invalid port: " + port, e); + } + } + } + } + + /** + * Update the configuration data with the management port. + * @param configuration + * @param managementPort The string from the command line + */ + private void updateManagementPort(ServerConfiguration configuration, String managementPort) + { + if (managementPort != null) + { + try + { + configuration.setJMXManagementPort(Integer.parseInt(managementPort)); + } + catch (NumberFormatException e) + { + _logger.warn("Invalid management port: " + managementPort + " will use:" + configuration.getJMXManagementPort(), e); + } + } + } + + private void configureLoggingManagementMBean(File logConfigFile, int logWatchTime) throws Exception + { + LoggingManagementMBean blm = new LoggingManagementMBean(logConfigFile.getPath(),logWatchTime); + + blm.register(); + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java index 71cf17ed60..7e1f6ede72 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java @@ -20,17 +20,6 @@ */ package org.apache.qpid.server; -import java.io.File; -import java.io.IOException; -import java.io.InputStream; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.util.EnumSet; -import java.util.HashSet; -import java.util.List; -import java.util.Properties; -import java.util.Set; - import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.HelpFormatter; import org.apache.commons.cli.Option; @@ -38,86 +27,17 @@ import org.apache.commons.cli.OptionBuilder; import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; import org.apache.commons.cli.PosixParser; -import org.apache.log4j.Logger; -import org.apache.log4j.PropertyConfigurator; -import org.apache.log4j.xml.QpidLog4JConfigurator; +import org.apache.qpid.BrokerOptions; import org.apache.qpid.common.QpidProperties; import org.apache.qpid.framing.ProtocolVersion; -import org.apache.qpid.server.configuration.ServerConfiguration; -import org.apache.qpid.server.configuration.management.ConfigurationManagementMBean; -import org.apache.qpid.server.information.management.ServerInformationMBean; -import org.apache.qpid.server.logging.SystemOutMessageLogger; -import org.apache.qpid.server.logging.actors.BrokerActor; -import org.apache.qpid.server.logging.actors.CurrentActor; -import org.apache.qpid.server.logging.actors.GenericActor; -import org.apache.qpid.server.logging.management.LoggingManagementMBean; -import org.apache.qpid.server.logging.messages.BrokerMessages; -import org.apache.qpid.server.protocol.AMQProtocolEngineFactory; -import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory; -import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory.VERSION; -import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry; -import org.apache.qpid.server.transport.QpidAcceptor; -import org.apache.qpid.ssl.SSLContextFactory; -import org.apache.qpid.transport.NetworkDriver; -import org.apache.qpid.transport.network.mina.MINANetworkDriver; /** - * Main entry point for AMQPD. - * + * Main entry point for Qpid broker. */ public class Main { - private static Logger _logger; - - private static final String DEFAULT_CONFIG_FILE = "etc/config.xml"; - - public static final String DEFAULT_LOG_CONFIG_FILENAME = "log4j.xml"; - public static final String QPID_HOME = "QPID_HOME"; - private static final int IPV4_ADDRESS_LENGTH = 4; - - private static final char IPV4_LITERAL_SEPARATOR = '.'; - - protected static class InitException extends Exception - { - InitException(String msg, Throwable cause) - { - super(msg, cause); - } - } - - protected final Options options = new Options(); - protected CommandLine commandLine; - - protected Main(String[] args) - { - setOptions(options); - if (parseCommandline(args)) - { - execute(); - } - } - - protected boolean parseCommandline(String[] args) - { - try - { - commandLine = new PosixParser().parse(options, args); - - return true; - } - catch (ParseException e) - { - System.err.println("Error: " + e.getMessage()); - HelpFormatter formatter = new HelpFormatter(); - formatter.printHelp("Qpid", options, true); - - return false; - } - } - @SuppressWarnings("static-access") - protected void setOptions(Options options) + private static Options getOptions() { Option help = new Option("h", "help", false, "print this message"); Option version = new Option("v", "version", false, "print the version information and exit"); @@ -165,13 +85,14 @@ public class Main Option logconfig = OptionBuilder.withArgName("logconfig").hasArg() .withDescription("use the specified log4j xml configuration file. By " - + "default looks for a file named " + DEFAULT_LOG_CONFIG_FILENAME + + "default looks for a file named " + BrokerOptions.DEFAULT_LOG_CONFIG_FILENAME + " in the same directory as the configuration file").withLongOpt("logconfig").create("l"); Option logwatchconfig = OptionBuilder.withArgName("logwatch").hasArg() .withDescription("monitor the log file configuration file for changes. Units are seconds. " + "Zero means do not check for changes.").withLongOpt("logwatch").create("w"); + Options options = new Options(); options.addOption(help); options.addOption(version); options.addOption(configFile); @@ -184,435 +105,71 @@ public class Main options.addOption(exclude0_8); options.addOption(mport); options.addOption(bind); + return options; } - protected void execute() - { - // note this understands either --help or -h. If an option only has a long name you can use that but if - // an option has a short name and a long name you must use the short name here. - if (commandLine.hasOption("h")) - { - HelpFormatter formatter = new HelpFormatter(); - formatter.printHelp("Qpid", options, true); - } - else if (commandLine.hasOption("v")) - { - String ver = QpidProperties.getVersionString(); - - StringBuilder protocol = new StringBuilder("AMQP version(s) [major.minor]: "); - - boolean first = true; - for (ProtocolVersion pv : ProtocolVersion.getSupportedProtocolVersions()) - { - if (first) - { - first = false; - } - else - { - protocol.append(", "); - } - - protocol.append(pv.getMajorVersion()).append('-').append(pv.getMinorVersion()); - - } - - System.out.println(ver + " (" + protocol + ")"); - } - else - { - try - { - CurrentActor.set(new BrokerActor(new SystemOutMessageLogger())); - startup(); - CurrentActor.remove(); - } - catch (InitException e) - { - System.out.println("Initialisation Error : " + e.getMessage()); - shutdown(1); - } - catch (Throwable e) - { - System.out.println("Error initialising message broker: " + e); - e.printStackTrace(); - shutdown(1); - } - } - } - - protected void shutdown(int status) - { - ApplicationRegistry.removeAll(); - System.exit(status); - } - - protected void startup() throws Exception + public static void main(String[] args) { - final String QpidHome = System.getProperty(QPID_HOME); - final File defaultConfigFile = new File(QpidHome, DEFAULT_CONFIG_FILE); - final File configFile = new File(commandLine.getOptionValue("c", defaultConfigFile.getPath())); - if (!configFile.exists()) - { - String error = "File " + configFile + " could not be found. Check the file exists and is readable."; - - if (QpidHome == null) - { - error = error + "\nNote: " + QPID_HOME + " is not set."; - } - - throw new InitException(error, null); - } - else - { - CurrentActor.get().message(BrokerMessages.CONFIG(configFile.getAbsolutePath())); - } - - String logConfig = commandLine.getOptionValue("l"); - String logWatchConfig = commandLine.getOptionValue("w", "0"); - - int logWatchTime = 0; + Options options = getOptions(); try { - logWatchTime = Integer.parseInt(logWatchConfig); - } - catch (NumberFormatException e) - { - System.err.println("Log watch configuration value of " + logWatchConfig + " is invalid. Must be " - + "a non-negative integer. Using default of zero (no watching configured"); - } - - File logConfigFile; - if (logConfig != null) - { - logConfigFile = new File(logConfig); - configureLogging(logConfigFile, logWatchTime); - } - else - { - File configFileDirectory = configFile.getParentFile(); - logConfigFile = new File(configFileDirectory, DEFAULT_LOG_CONFIG_FILENAME); - configureLogging(logConfigFile, logWatchTime); - } - - ConfigurationFileApplicationRegistry config = new ConfigurationFileApplicationRegistry(configFile); - ServerConfiguration serverConfig = config.getConfiguration(); - updateManagementPort(serverConfig, commandLine.getOptionValue("m")); - - ApplicationRegistry.initialise(config); - - // We have already loaded the BrokerMessages class by this point so we - // need to refresh the locale setting incase we had a different value in - // the configuration. - BrokerMessages.reload(); - - // AR.initialise() sets and removes its own actor so we now need to set the actor - // for the remainder of the startup, and the default actor if the stack is empty - CurrentActor.set(new BrokerActor(config.getCompositeStartupMessageLogger())); - CurrentActor.setDefault(new BrokerActor(config.getRootMessageLogger())); - GenericActor.setDefaultMessageLogger(config.getRootMessageLogger()); + CommandLine commandLine = new PosixParser().parse(options, args); - - try - { - configureLoggingManagementMBean(logConfigFile, logWatchTime); - - ConfigurationManagementMBean configMBean = new ConfigurationManagementMBean(); - configMBean.register(); - - ServerInformationMBean sysInfoMBean = - new ServerInformationMBean(QpidProperties.getBuildVersion(), QpidProperties.getReleaseVersion()); - sysInfoMBean.register(); - - - String[] portStr = commandLine.getOptionValues("p"); - - Set<Integer> ports = new HashSet<Integer>(); - Set<Integer> exclude_0_10 = new HashSet<Integer>(); - Set<Integer> exclude_0_9_1 = new HashSet<Integer>(); - Set<Integer> exclude_0_9 = new HashSet<Integer>(); - Set<Integer> exclude_0_8 = new HashSet<Integer>(); - - if(portStr == null || portStr.length == 0) - { - - parsePortList(ports, serverConfig.getPorts()); - parsePortList(exclude_0_10, serverConfig.getPortExclude010()); - parsePortList(exclude_0_9_1, serverConfig.getPortExclude091()); - parsePortList(exclude_0_9, serverConfig.getPortExclude09()); - parsePortList(exclude_0_8, serverConfig.getPortExclude08()); - - } - else - { - parsePortArray(ports, portStr); - parsePortArray(exclude_0_10, commandLine.getOptionValues("exclude-0-10")); - parsePortArray(exclude_0_9_1, commandLine.getOptionValues("exclude-0-9-1")); - parsePortArray(exclude_0_9, commandLine.getOptionValues("exclude-0-9")); - parsePortArray(exclude_0_8, commandLine.getOptionValues("exclude-0-8")); - - } - - - - - String bindAddr = commandLine.getOptionValue("b"); - if (bindAddr == null) - { - bindAddr = serverConfig.getBind(); - } - InetAddress bindAddress = null; - - - - if (bindAddr.equals("wildcard")) + // note this understands either --help or -h. If an option only has a + // long name you can use that but if an option has a short name and a + // long name you must use the short name here. + if (commandLine.hasOption("h")) { - bindAddress = new InetSocketAddress(0).getAddress(); + HelpFormatter formatter = new HelpFormatter(); + formatter.printHelp("Qpid", options, true); } - else + else if (commandLine.hasOption("v")) { - bindAddress = InetAddress.getByAddress(parseIP(bindAddr)); - } - - String hostName = bindAddress.getCanonicalHostName(); - - - String keystorePath = serverConfig.getKeystorePath(); - String keystorePassword = serverConfig.getKeystorePassword(); - String certType = serverConfig.getCertType(); - SSLContextFactory sslFactory = null; - - if (!serverConfig.getSSLOnly()) - { - - for(int port : ports) + String ver = QpidProperties.getVersionString(); + + StringBuilder protocol = new StringBuilder(); + for (ProtocolVersion pv : ProtocolVersion.getSupportedProtocolVersions()) { - - NetworkDriver driver = new MINANetworkDriver(); - - Set<VERSION> supported = EnumSet.allOf(VERSION.class); - - if(exclude_0_10.contains(port)) - { - supported.remove(VERSION.v0_10); - } - - if(exclude_0_9_1.contains(port)) - { - supported.remove(VERSION.v0_9_1); - } - if(exclude_0_9.contains(port)) - { - supported.remove(VERSION.v0_9); - } - if(exclude_0_8.contains(port)) + if (protocol.length() > 0) { - supported.remove(VERSION.v0_8); + protocol.append(", "); } - - MultiVersionProtocolEngineFactory protocolEngineFactory = - new MultiVersionProtocolEngineFactory(hostName, supported); - - - - driver.bind(port, new InetAddress[]{bindAddress}, protocolEngineFactory, - serverConfig.getNetworkConfiguration(), null); - ApplicationRegistry.getInstance().addAcceptor(new InetSocketAddress(bindAddress, port), - new QpidAcceptor(driver,"TCP")); - CurrentActor.get().message(BrokerMessages.LISTENING("TCP", port)); - - } - - } - - if (serverConfig.getEnableSSL()) - { - sslFactory = new SSLContextFactory(keystorePath, keystorePassword, certType); - NetworkDriver driver = new MINANetworkDriver(); - driver.bind(serverConfig.getSSLPort(), new InetAddress[]{bindAddress}, - new AMQProtocolEngineFactory(), serverConfig.getNetworkConfiguration(), sslFactory); - ApplicationRegistry.getInstance().addAcceptor(new InetSocketAddress(bindAddress, serverConfig.getSSLPort()), - new QpidAcceptor(driver,"TCP")); - CurrentActor.get().message(BrokerMessages.LISTENING("TCP/SSL", serverConfig.getSSLPort())); - } - - CurrentActor.get().message(BrokerMessages.READY()); - - } - finally - { - // Startup is complete so remove the AR initialised Startup actor - CurrentActor.remove(); - } - - - - } - - private void parsePortArray(Set<Integer> ports, String[] portStr) - throws InitException - { - if(portStr != null) - { - for(int i = 0; i < portStr.length; i++) - { - try - { - ports.add(Integer.parseInt(portStr[i])); - } - catch (NumberFormatException e) - { - throw new InitException("Invalid port: " + portStr[i], e); + protocol.append(pv.getMajorVersion()).append('-').append(pv.getMinorVersion()); } + + System.out.println(ver + " AMQP version(s) [major.minor]: (" + protocol + ")"); } - } - } - - private void parsePortList(Set<Integer> output, List input) - throws InitException - { - if(input != null) - { - for(Object port : input) + else { - try + BrokerOptions brokerOptions = new BrokerOptions(); + for (String option : BrokerOptions.COMMAND_LINE_OPTIONS) { - output.add(Integer.parseInt(String.valueOf(port))); + brokerOptions.put(option, commandLine.getOptionValues(option)); } - catch (NumberFormatException e) - { - throw new InitException("Invalid port: " + port, e); - } - } - } - } - - /** - * Update the configuration data with the management port. - * @param configuration - * @param managementPort The string from the command line - */ - private void updateManagementPort(ServerConfiguration configuration, String managementPort) - { - if (managementPort != null) - { - try - { - configuration.setJMXManagementPort(Integer.parseInt(managementPort)); - } - catch (NumberFormatException e) - { - _logger.warn("Invalid management port: " + managementPort + " will use:" + configuration.getJMXManagementPort(), e); - } - } - } - - public static void main(String[] args) - { - //if the -Dlog4j.configuration property has not been set, enable the init override - //to stop Log4J wondering off and picking up the first log4j.xml/properties file it - //finds from the classpath when we get the first Loggers - if(System.getProperty("log4j.configuration") == null) - { - System.setProperty("log4j.defaultInitOverride", "true"); - } - - //now that the override status is know, we can instantiate the Loggers - _logger = Logger.getLogger(Main.class); - - new Main(args); - } - - private byte[] parseIP(String address) throws Exception - { - char[] literalBuffer = address.toCharArray(); - int byteCount = 0; - int currByte = 0; - byte[] ip = new byte[IPV4_ADDRESS_LENGTH]; - for (int i = 0; i < literalBuffer.length; i++) - { - char currChar = literalBuffer[i]; - if ((currChar >= '0') && (currChar <= '9')) - { - currByte = (currByte * 10) + (Character.digit(currChar, 10) & 0xFF); - } - - if (currChar == IPV4_LITERAL_SEPARATOR || (i + 1 == literalBuffer.length)) - { - ip[byteCount++] = (byte) currByte; - currByte = 0; - } - } - - if (byteCount != 4) - { - throw new Exception("Invalid IP address: " + address); - } - return ip; - } - - private void configureLogging(File logConfigFile, int logWatchTime) throws InitException, IOException - { - if (logConfigFile.exists() && logConfigFile.canRead()) - { - CurrentActor.get().message(BrokerMessages.LOG_CONFIG(logConfigFile.getAbsolutePath())); - - if (logWatchTime > 0) - { - System.out.println("log file " + logConfigFile.getAbsolutePath() + " will be checked for changes every " - + logWatchTime + " seconds"); - // log4j expects the watch interval in milliseconds + BrokerInstance broker = new BrokerInstance(); try { - QpidLog4JConfigurator.configureAndWatch(logConfigFile.getPath(), logWatchTime * 1000); - } - catch (Exception e) - { - throw new InitException(e.getMessage(),e); + broker.startup(brokerOptions); } - } - else - { - try + catch (BrokerInstance.InitException e) { - QpidLog4JConfigurator.configure(logConfigFile.getPath()); + System.out.println("Error initialising message broker: " + e); + e.printStackTrace(); + broker.shutdown(); } - catch (Exception e) + catch (Throwable e) { - throw new InitException(e.getMessage(),e); + System.out.println("Error running message broker: " + e); + e.printStackTrace(); + broker.shutdown(); } } } - else + catch (ParseException pe) { - System.err.println("Logging configuration error: unable to read file " + logConfigFile.getAbsolutePath()); - System.err.println("Using the fallback internal log4j.properties configuration"); - - InputStream propsFile = this.getClass().getResourceAsStream("/log4j.properties"); - if(propsFile == null) - { - throw new IOException("Unable to load the fallback internal log4j.properties configuration file"); - } - else - { - try - { - Properties fallbackProps = new Properties(); - fallbackProps.load(propsFile); - PropertyConfigurator.configure(fallbackProps); - } - finally - { - propsFile.close(); - } - } + System.err.println("Error: " + pe.getMessage()); + HelpFormatter formatter = new HelpFormatter(); + formatter.printHelp("Qpid", options, true); } } - - private void configureLoggingManagementMBean(File logConfigFile, int logWatchTime) throws Exception - { - LoggingManagementMBean blm = new LoggingManagementMBean(logConfigFile.getPath(),logWatchTime); - - blm.register(); - } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ConnectionConfig.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ConnectionConfig.java index 0dd36fe1fe..2a480ddd6d 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ConnectionConfig.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ConnectionConfig.java @@ -21,11 +21,13 @@ package org.apache.qpid.server.configuration; +import java.net.SocketAddress; + public interface ConnectionConfig extends ConfiguredObject<ConnectionConfigType, ConnectionConfig> { VirtualHostConfig getVirtualHost(); - String getAddress(); + SocketAddress getRemoteAddress(); Boolean isIncoming(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ConnectionConfigType.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ConnectionConfigType.java index 9750b12dea..90bca5f89c 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ConnectionConfigType.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ConnectionConfigType.java @@ -63,7 +63,7 @@ public final class ConnectionConfigType extends ConfigObjectType<ConnectionConfi { public String getValue(ConnectionConfig object) { - return object.getAddress(); + return object.getRemoteAddress().toString(); } }; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java index 7197ec8cdc..6e64b9ebb2 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java @@ -42,7 +42,6 @@ import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; -import org.apache.qpid.transport.NetworkDriverConfiguration; import sun.misc.Signal; import sun.misc.SignalHandler; @@ -682,7 +681,7 @@ public class ServerConfiguration extends ConfigurationPlugin implements SignalHa public String getBind() { - return getStringValue("connector.bind", "wildcard"); + return getStringValue("connector.bind", "*"); } public int getReceiveBufferSize() @@ -767,59 +766,6 @@ public class ServerConfiguration extends ConfigurationPlugin implements SignalHa DEFAULT_HOUSEKEEPING_PERIOD)); } - public NetworkDriverConfiguration getNetworkConfiguration() - { - return new NetworkDriverConfiguration() - { - - public Integer getTrafficClass() - { - return null; - } - - public Boolean getTcpNoDelay() - { - // Can't call parent getTcpNoDelay since it just calls this one - return getBooleanValue("connector.tcpNoDelay", true); - } - - public Integer getSoTimeout() - { - return null; - } - - public Integer getSoLinger() - { - return null; - } - - public Integer getSendBufferSize() - { - return getBufferWriteLimit(); - } - - public Boolean getReuseAddress() - { - return null; - } - - public Integer getReceiveBufferSize() - { - return getBufferReadLimit(); - } - - public Boolean getOOBInline() - { - return null; - } - - public Boolean getKeepAlive() - { - return null; - } - }; - } - public int getMaxChannelCount() { return getIntValue("maximumChannelCount", 256); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/BrokerLink.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/BrokerLink.java index fa2fb9ead1..24b409f224 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/BrokerLink.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/BrokerLink.java @@ -36,6 +36,8 @@ import org.apache.qpid.transport.Session; import org.apache.qpid.transport.SessionDelegate; import org.apache.qpid.transport.TransportException; +import java.net.InetSocketAddress; +import java.net.SocketAddress; import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; @@ -114,9 +116,9 @@ public class BrokerLink implements LinkConfig, ConnectionListener return BrokerLink.this.getVirtualHost(); } - public String getAddress() + public SocketAddress getRemoteAddress() { - return _host+":"+_port; + return new InetSocketAddress(_host, _port); } public Boolean isIncoming() diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java index a1ffe272fd..a602788b2c 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java @@ -71,7 +71,6 @@ import org.apache.qpid.pool.ReferenceCountingExecutorService; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.protocol.AMQMethodListener; -import org.apache.qpid.protocol.ProtocolEngine; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.configuration.ConfigStore; import org.apache.qpid.server.configuration.ConfiguredObject; @@ -94,17 +93,17 @@ import org.apache.qpid.server.state.AMQState; import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; -import org.apache.qpid.transport.NetworkDriver; +import org.apache.qpid.transport.Receiver; import org.apache.qpid.transport.Sender; +import org.apache.qpid.transport.network.NetworkConnection; +import org.apache.qpid.transport.network.NetworkTransport; -public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocolSession, ConnectionConfig +public class AMQProtocolEngine implements Receiver<java.nio.ByteBuffer>, Managable, AMQProtocolSession, ConnectionConfig { private static final Logger _logger = Logger.getLogger(AMQProtocolEngine.class); private static final String CLIENT_PROPERTIES_INSTANCE = ClientProperties.instance.toString(); - private static final AtomicLong idGenerator = new AtomicLong(0); - // to save boxing the channelId and looking up in a map... cache in an array the low numbered // channels. This value must be of the form 2^x - 1. private static final int CHANNEL_CACHE_SIZE = 0xff; @@ -150,13 +149,14 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol private MethodDispatcher _dispatcher; private ProtocolSessionIdentifier _sessionIdentifier; - // Create a simple ID that increments for ever new Session - private final long _sessionID = idGenerator.getAndIncrement(); + private long _sessionID; private AMQPConnectionActor _actor; private LogSubject _logSubject; - private NetworkDriver _networkDriver; + private NetworkTransport _transport; + private NetworkConnection _network; + private Sender<ByteBuffer> _sender; private long _lastIoTime; @@ -178,15 +178,17 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol return _managedObject; } - public AMQProtocolEngine(VirtualHostRegistry virtualHostRegistry, NetworkDriver driver) + public AMQProtocolEngine(VirtualHostRegistry virtualHostRegistry, NetworkTransport transport, NetworkConnection network, Sender<ByteBuffer> sender, long connectionId) { _stateManager = new AMQStateManager(virtualHostRegistry, this); - _networkDriver = driver; - _codecFactory = new AMQCodecFactory(true, this); _poolReference.acquireExecutorService(); _readJob = new Job(_poolReference, Job.MAX_JOB_EVENTS, true); _writeJob = new Job(_poolReference, Job.MAX_JOB_EVENTS, false); + _transport = transport; + _network = network; + _sender = sender; + _sessionID = connectionId; _actor = new AMQPConnectionActor(this, virtualHostRegistry.getApplicationRegistry().getRootMessageLogger()); @@ -195,9 +197,7 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol _configStore = virtualHostRegistry.getConfigStore(); _id = _configStore.createId(); - _actor.message(ConnectionMessages.OPEN(null, null, false, false)); - } private AMQProtocolSessionMBean createMBean() throws JMException @@ -363,14 +363,14 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol null, mechanisms.getBytes(), locales.getBytes()); - _networkDriver.send(responseBody.generateFrame(0).toNioByteBuffer()); + _sender.send(responseBody.generateFrame(0).toNioByteBuffer()); } catch (AMQException e) { _logger.info("Received unsupported protocol initiation for protocol version: " + getProtocolVersion()); - _networkDriver.send(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion()).toNioByteBuffer()); + _sender.send(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion()).toNioByteBuffer()); } } @@ -491,7 +491,7 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol { public void run() { - _networkDriver.send(buf); + _sender.send(buf); } }); } @@ -683,8 +683,9 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol { if (delay > 0) { - _networkDriver.setMaxWriteIdle(delay); - _networkDriver.setMaxReadIdle((int) (ApplicationRegistry.getInstance().getConfiguration().getHeartBeatTimeout() * delay)); + // FIXME +// _transport.setMaxWriteIdle(delay); +// _transport.setMaxReadIdle((int) (ApplicationRegistry.getInstance().getConfiguration().getHeartBeatTimeout() * delay)); } } @@ -788,7 +789,7 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol public void closeProtocolSession() { - _networkDriver.close(); + _sender.close(); try { _stateManager.changeState(AMQState.CONNECTION_CLOSED); @@ -823,7 +824,7 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol */ public String getLocalFQDN() { - SocketAddress address = _networkDriver.getLocalAddress(); + SocketAddress address = _transport.getAddress(); // we use the vmpipe address in some tests hence the need for this rather ugly test. The host // information is used by SASL primary. if (address instanceof InetSocketAddress) @@ -912,7 +913,7 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol public Object getClientIdentifier() { - return (_networkDriver != null) ? _networkDriver.getRemoteAddress() : null; + return (_network != null) ? _network.getRemoteAddress() : null; } public VirtualHost getVirtualHost() @@ -971,12 +972,7 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol public SocketAddress getRemoteAddress() { - return _networkDriver.getRemoteAddress(); - } - - public SocketAddress getLocalAddress() - { - return _networkDriver.getLocalAddress(); + return _network.getRemoteAddress(); } public MethodRegistry getMethodRegistry() @@ -1006,14 +1002,9 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol // Nothing } - public void setNetworkDriver(NetworkDriver driver) - { - _networkDriver = driver; - } - public void writerIdle() { - _networkDriver.send(HeartbeatBody.FRAME.toNioByteBuffer()); + _sender.send(HeartbeatBody.FRAME.toNioByteBuffer()); } public void exception(Throwable throwable) @@ -1021,7 +1012,7 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol if (throwable instanceof AMQProtocolHeaderException) { writeFrame(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion())); - _networkDriver.close(); + _sender.close(); _logger.error("Error in protocol initiation " + this + ":" + getRemoteAddress() + " :" + throwable.getMessage(), throwable); } @@ -1039,7 +1030,7 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol writeFrame(closeBody.generateFrame(0)); - _networkDriver.close(); + _sender.close(); } } @@ -1050,7 +1041,7 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol public void setSender(Sender<ByteBuffer> sender) { - // Do nothing + _sender = sender; } public long getReadBytes() diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngineFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngineFactory.java deleted file mode 100644 index 0e4444725e..0000000000 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngineFactory.java +++ /dev/null @@ -1,50 +0,0 @@ -package org.apache.qpid.server.protocol; -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - - -import org.apache.qpid.protocol.ProtocolEngine; -import org.apache.qpid.protocol.ProtocolEngineFactory; -import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.virtualhost.VirtualHostRegistry; -import org.apache.qpid.transport.NetworkDriver; - -public class AMQProtocolEngineFactory implements ProtocolEngineFactory -{ - private VirtualHostRegistry _vhosts; - - public AMQProtocolEngineFactory() - { - this(1); - } - - public AMQProtocolEngineFactory(Integer port) - { - _vhosts = ApplicationRegistry.getInstance(port).getVirtualHostRegistry(); - } - - - public ProtocolEngine newProtocolEngine(NetworkDriver networkDriver) - { - return new AMQProtocolEngine(_vhosts, networkDriver); - } - -} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/BrokerReceiver.java index eb957ee33c..393a350c07 100755..100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/BrokerReceiver.java @@ -20,80 +20,59 @@ */ package org.apache.qpid.server.protocol; +import java.nio.ByteBuffer; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; import org.apache.log4j.Logger; -import org.apache.qpid.protocol.ProtocolEngine; -import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory.VERSION; +import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.messages.ConnectionMessages; +import org.apache.qpid.server.protocol.BrokerReceiverFactory.VERSION; import org.apache.qpid.server.registry.IApplicationRegistry; import org.apache.qpid.server.transport.ServerConnection; import org.apache.qpid.transport.ConnectionDelegate; -import org.apache.qpid.transport.NetworkDriver; - -import java.net.SocketAddress; -import java.nio.ByteBuffer; -import java.util.Set; +import org.apache.qpid.transport.Receiver; +import org.apache.qpid.transport.Sender; +import org.apache.qpid.transport.network.NetworkConnection; +import org.apache.qpid.transport.network.NetworkTransport; -public class MultiVersionProtocolEngine implements ProtocolEngine +public class BrokerReceiver implements Receiver<java.nio.ByteBuffer> { - private static final Logger _logger = Logger.getLogger(MultiVersionProtocolEngine.class); - + private static final Logger _logger = Logger.getLogger(BrokerReceiver.class); + private static final AtomicLong _idGenerator = new AtomicLong(0); - private NetworkDriver _networkDriver; + private NetworkConnection _network; + private NetworkTransport _transport; + private Sender<ByteBuffer> _sender; private Set<VERSION> _supported; private String _fqdn; private IApplicationRegistry _appRegistry; + private long _conenctionId; - private volatile ProtocolEngine _delegate = new SelfDelegateProtocolEngine(); + private volatile Receiver<java.nio.ByteBuffer> _delegate = new SelfDelegateProtocolEngine(); - public MultiVersionProtocolEngine(IApplicationRegistry appRegistry, + public BrokerReceiver(IApplicationRegistry appRegistry, String fqdn, - Set<VERSION> supported, NetworkDriver networkDriver) + Set<VERSION> supported, + NetworkTransport transport, + NetworkConnection network) { _appRegistry = appRegistry; _fqdn = fqdn; _supported = supported; - _networkDriver = networkDriver; - } - - public void setNetworkDriver(NetworkDriver driver) - { - _delegate.setNetworkDriver(driver); - } - - public SocketAddress getRemoteAddress() - { - return _delegate.getRemoteAddress(); - } - - public SocketAddress getLocalAddress() - { - return _delegate.getLocalAddress(); - } - - public long getWrittenBytes() - { - return _delegate.getWrittenBytes(); - } - - public long getReadBytes() - { - return _delegate.getReadBytes(); + _transport = transport; + _network = network; + _sender = _network.getSender(); + _conenctionId = _idGenerator.incrementAndGet(); + + CurrentActor.get().message(ConnectionMessages.OPEN(null, null, false, false)); } public void closed() { _delegate.closed(); - } - - public void writerIdle() - { - _delegate.writerIdle(); - } - - public void readerIdle() - { - _delegate.readerIdle(); + _network.close(); } public void received(ByteBuffer msg) @@ -157,12 +136,11 @@ private static final byte[] AMQP_0_9_1_HEADER = { VERSION getVersion(); byte[] getHeaderIdentifier(); - ProtocolEngine getProtocolEngine(); + Receiver<java.nio.ByteBuffer> getProtocolEngine(); } private DelegateCreator creator_0_8 = new DelegateCreator() { - public VERSION getVersion() { return VERSION.v0_8; @@ -173,110 +151,76 @@ private static final byte[] AMQP_0_9_1_HEADER = return AMQP_0_8_HEADER; } - public ProtocolEngine getProtocolEngine() + public Receiver<java.nio.ByteBuffer> getProtocolEngine() { - return new AMQProtocolEngine(_appRegistry.getVirtualHostRegistry(), _networkDriver); + return new AMQProtocolEngine(_appRegistry.getVirtualHostRegistry(), _transport, _network, _sender, _conenctionId); } }; private DelegateCreator creator_0_9 = new DelegateCreator() { - public VERSION getVersion() { return VERSION.v0_9; } - public byte[] getHeaderIdentifier() { return AMQP_0_9_HEADER; } - public ProtocolEngine getProtocolEngine() + public Receiver<java.nio.ByteBuffer> getProtocolEngine() { - return new AMQProtocolEngine(_appRegistry.getVirtualHostRegistry(), _networkDriver); + return new AMQProtocolEngine(_appRegistry.getVirtualHostRegistry(), _transport, _network, _sender, _conenctionId); } }; private DelegateCreator creator_0_9_1 = new DelegateCreator() { - public VERSION getVersion() { return VERSION.v0_9_1; } - public byte[] getHeaderIdentifier() { return AMQP_0_9_1_HEADER; } - public ProtocolEngine getProtocolEngine() + public Receiver<java.nio.ByteBuffer> getProtocolEngine() { - return new AMQProtocolEngine(_appRegistry.getVirtualHostRegistry(), _networkDriver); + return new AMQProtocolEngine(_appRegistry.getVirtualHostRegistry(), _transport, _network, _sender, _conenctionId); } }; - private DelegateCreator creator_0_10 = new DelegateCreator() { - public VERSION getVersion() { return VERSION.v0_10; } - public byte[] getHeaderIdentifier() { return AMQP_0_10_HEADER; } - public ProtocolEngine getProtocolEngine() + public Receiver<java.nio.ByteBuffer> getProtocolEngine() { - final ConnectionDelegate connDelegate = - new org.apache.qpid.server.transport.ServerConnectionDelegate(_appRegistry, _fqdn); + final ConnectionDelegate connDelegate = new org.apache.qpid.server.transport.ServerConnectionDelegate(_appRegistry, _fqdn); - ServerConnection conn = new ServerConnection(); + ServerConnection conn = new ServerConnection(_conenctionId); conn.setConnectionDelegate(connDelegate); - return new ProtocolEngine_0_10( conn, _networkDriver, _appRegistry); + return new ProtocolEngine_0_10(conn, _appRegistry, _network); } }; - private final DelegateCreator[] _creators = - new DelegateCreator[] { creator_0_8, creator_0_9, creator_0_9_1, creator_0_10 }; + private final DelegateCreator[] _creators = new DelegateCreator[] { creator_0_8, creator_0_9, creator_0_9_1, creator_0_10 }; - private class ClosedDelegateProtocolEngine implements ProtocolEngine + private class ClosedDelegateProtocolEngine implements Receiver<java.nio.ByteBuffer> { - public void setNetworkDriver(NetworkDriver driver) - { - _networkDriver = driver; - } - - public SocketAddress getRemoteAddress() - { - return _networkDriver.getRemoteAddress(); - } - - public SocketAddress getLocalAddress() - { - return _networkDriver.getLocalAddress(); - } - - public long getWrittenBytes() - { - return 0; - } - - public long getReadBytes() - { - return 0; - } - public void received(ByteBuffer msg) { _logger.error("Error processing incoming data, could not negotiate a common protocol"); @@ -291,48 +235,12 @@ private static final byte[] AMQP_0_9_1_HEADER = { } - - public void writerIdle() - { - - } - - public void readerIdle() - { - - } } - private class SelfDelegateProtocolEngine implements ProtocolEngine + private class SelfDelegateProtocolEngine implements Receiver<java.nio.ByteBuffer> { - private final ByteBuffer _header = ByteBuffer.allocate(MINIMUM_REQUIRED_HEADER_BYTES); - public void setNetworkDriver(NetworkDriver driver) - { - _networkDriver = driver; - } - - public SocketAddress getRemoteAddress() - { - return _networkDriver.getRemoteAddress(); - } - - public SocketAddress getLocalAddress() - { - return _networkDriver.getLocalAddress(); - } - - public long getWrittenBytes() - { - return 0; - } - - public long getReadBytes() - { - return 0; - } - public void received(ByteBuffer msg) { ByteBuffer msgheader = msg.duplicate(); @@ -355,7 +263,7 @@ private static final byte[] AMQP_0_9_1_HEADER = _header.get(headerBytes); - ProtocolEngine newDelegate = null; + Receiver<java.nio.ByteBuffer> newDelegate = null; byte[] newestSupported = null; for(int i = 0; newDelegate == null && i < _creators.length; i++) @@ -380,14 +288,12 @@ private static final byte[] AMQP_0_9_1_HEADER = // If no delegate is found then send back the most recent support protocol version id if(newDelegate == null) { - _networkDriver.send(ByteBuffer.wrap(newestSupported)); - + _sender.send(ByteBuffer.wrap(newestSupported)); + _sender.close(); _delegate = new ClosedDelegateProtocolEngine(); } else { - newDelegate.setNetworkDriver(_networkDriver); - _delegate = newDelegate; _header.flip(); @@ -397,9 +303,7 @@ private static final byte[] AMQP_0_9_1_HEADER = _delegate.received(msg); } } - } - } public void exception(Throwable t) @@ -411,15 +315,5 @@ private static final byte[] AMQP_0_9_1_HEADER = { } - - public void writerIdle() - { - - } - - public void readerIdle() - { - - } } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/BrokerReceiverFactory.java index 75358c42d9..4efeb78b88 100755..100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/BrokerReceiverFactory.java @@ -20,56 +20,46 @@ */ package org.apache.qpid.server.protocol; -import org.apache.qpid.protocol.ProtocolEngineFactory; -import org.apache.qpid.protocol.ProtocolEngine; -import org.apache.qpid.transport.NetworkDriver; +import java.nio.ByteBuffer; +import java.util.EnumSet; +import java.util.Set; + +import org.apache.qpid.protocol.ReceiverFactory; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.registry.IApplicationRegistry; +import org.apache.qpid.transport.Receiver; +import org.apache.qpid.transport.network.NetworkConnection; +import org.apache.qpid.transport.network.NetworkTransport; -import java.util.Set; -import java.util.Arrays; -import java.util.HashSet; - -public class MultiVersionProtocolEngineFactory implements ProtocolEngineFactory +public class BrokerReceiverFactory implements ReceiverFactory { - ; - - public enum VERSION { v0_8, v0_9, v0_9_1, v0_10 }; - private static final Set<VERSION> ALL_VERSIONS = new HashSet<VERSION>(Arrays.asList(VERSION.values())); + private static final Set<VERSION> ALL_VERSIONS = EnumSet.allOf(VERSION.class); private final IApplicationRegistry _appRegistry; private final String _fqdn; private final Set<VERSION> _supported; - - public MultiVersionProtocolEngineFactory() - { - this(1, "localhost", ALL_VERSIONS); - } - - public MultiVersionProtocolEngineFactory(String fqdn, Set<VERSION> versions) + public BrokerReceiverFactory() { - this(1, fqdn, versions); + this("localhost", ALL_VERSIONS); } - - public MultiVersionProtocolEngineFactory(String fqdn) + public BrokerReceiverFactory(String fqdn) { - this(1, fqdn, ALL_VERSIONS); + this(fqdn, ALL_VERSIONS); } - public MultiVersionProtocolEngineFactory(int instance, String fqdn, Set<VERSION> supportedVersions) + public BrokerReceiverFactory(String fqdn, Set<VERSION> supportedVersions) { - _appRegistry = ApplicationRegistry.getInstance(instance); + _appRegistry = ApplicationRegistry.getInstance(); _fqdn = fqdn; _supported = supportedVersions; } - - public ProtocolEngine newProtocolEngine(NetworkDriver networkDriver) + public Receiver<ByteBuffer> newReceiver(NetworkTransport transport, NetworkConnection network) { - return new MultiVersionProtocolEngine(_appRegistry, _fqdn, _supported, networkDriver); + return new BrokerReceiver(_appRegistry, _fqdn, _supported, transport, network); } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java index 30d506a89b..882ec45162 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java @@ -20,53 +20,40 @@ */ package org.apache.qpid.server.protocol; -import org.apache.qpid.protocol.ProtocolEngine; -import org.apache.qpid.transport.NetworkDriver; +import org.apache.qpid.transport.Receiver; import org.apache.qpid.transport.network.InputHandler; import org.apache.qpid.transport.network.Assembler; import org.apache.qpid.transport.network.Disassembler; +import org.apache.qpid.transport.network.NetworkConnection; import org.apache.qpid.server.configuration.*; import org.apache.qpid.server.transport.ServerConnection; -import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.messages.ConnectionMessages; import org.apache.qpid.server.registry.IApplicationRegistry; import java.net.SocketAddress; +import java.nio.ByteBuffer; import java.util.UUID; -public class ProtocolEngine_0_10 extends InputHandler implements ProtocolEngine, ConnectionConfig +public class ProtocolEngine_0_10 extends InputHandler implements Receiver<ByteBuffer>, ConnectionConfig { public static final int MAX_FRAME_SIZE = 64 * 1024 - 1; - private NetworkDriver _networkDriver; - private long _readBytes; - private long _writtenBytes; + private NetworkConnection _network; private ServerConnection _connection; private final UUID _id; private final IApplicationRegistry _appRegistry; private long _createTime = System.currentTimeMillis(); - public ProtocolEngine_0_10(ServerConnection conn, - NetworkDriver networkDriver, - final IApplicationRegistry appRegistry) + public ProtocolEngine_0_10(ServerConnection conn, final IApplicationRegistry appRegistry, NetworkConnection network) { super(new Assembler(conn)); _connection = conn; _connection.setConnectionConfig(this); - _networkDriver = networkDriver; + _network = network; _id = appRegistry.getConfigStore().createId(); _appRegistry = appRegistry; - // FIXME Two log messages to maintain compatinbility with earlier protocol versions - _connection.getLogActor().message(ConnectionMessages.OPEN(null, null, false, false)); - _connection.getLogActor().message(ConnectionMessages.OPEN(null, "0-10", false, true)); - } - - public void setNetworkDriver(NetworkDriver driver) - { - _networkDriver = driver; - Disassembler dis = new Disassembler(driver, MAX_FRAME_SIZE); - _connection.setSender(dis); + _connection.setSender(new Disassembler(_network.getSender(), MAX_FRAME_SIZE)); _connection.onOpen(new Runnable() { public void run() @@ -75,36 +62,7 @@ public class ProtocolEngine_0_10 extends InputHandler implements ProtocolEngine } }); - } - - public SocketAddress getRemoteAddress() - { - return _networkDriver.getRemoteAddress(); - } - - public SocketAddress getLocalAddress() - { - return _networkDriver.getLocalAddress(); - } - - public long getReadBytes() - { - return _readBytes; - } - - public long getWrittenBytes() - { - return _writtenBytes; - } - - public void writerIdle() - { - //Todo - } - - public void readerIdle() - { - //Todo + _connection.getLogActor().message(ConnectionMessages.OPEN(null, "0-10", false, true)); } public VirtualHostConfig getVirtualHost() @@ -112,9 +70,9 @@ public class ProtocolEngine_0_10 extends InputHandler implements ProtocolEngine return _connection.getVirtualHost(); } - public String getAddress() + public SocketAddress getRemoteAddress() { - return getRemoteAddress().toString(); + return _network.getRemoteAddress(); } public Boolean isIncoming() diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java index 78a642f22f..fd544a7a76 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java @@ -54,10 +54,10 @@ import org.apache.qpid.server.security.auth.database.ConfigurationFilePrincipalD import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager; import org.apache.qpid.server.security.auth.manager.AuthenticationManager; import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager; -import org.apache.qpid.server.transport.QpidAcceptor; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.virtualhost.VirtualHostImpl; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; +import org.apache.qpid.transport.network.NetworkTransport; /** * An abstract application registry that provides access to configuration information and handles the @@ -75,7 +75,7 @@ public abstract class ApplicationRegistry implements IApplicationRegistry public static final int DEFAULT_INSTANCE = 1; - protected final Map<InetSocketAddress, QpidAcceptor> _acceptors = new HashMap<InetSocketAddress, QpidAcceptor>(); + protected final Map<Integer, NetworkTransport> _transports = new HashMap<Integer, NetworkTransport>(); protected ManagedObjectRegistry _managedObjectRegistry; @@ -393,22 +393,20 @@ public abstract class ApplicationRegistry implements IApplicationRegistry private void unbind() { - synchronized (_acceptors) + synchronized (_transports) { - for (InetSocketAddress bindAddress : _acceptors.keySet()) + for (Integer port: _transports.keySet()) { - QpidAcceptor acceptor = _acceptors.get(bindAddress); - + NetworkTransport transport = _transports.get(port); try { - acceptor.getNetworkDriver().close(); + transport.close(); } catch (Throwable e) { _logger.error("Unable to close network driver due to:" + e.getMessage()); } - - CurrentActor.get().message(BrokerMessages.SHUTTING_DOWN(acceptor.toString(), bindAddress.getPort())); + CurrentActor.get().message(BrokerMessages.SHUTTING_DOWN(transport.getAddress().toString(), port)); } } } @@ -418,11 +416,11 @@ public abstract class ApplicationRegistry implements IApplicationRegistry return _configuration; } - public void addAcceptor(InetSocketAddress bindAddress, QpidAcceptor acceptor) + public void registerTransport(int port, NetworkTransport transport) { - synchronized (_acceptors) + synchronized (_transports) { - _acceptors.put(bindAddress, acceptor); + _transports.put(port, transport); } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java index 228c3b9112..272c10b79a 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java @@ -20,7 +20,6 @@ */ package org.apache.qpid.server.registry; -import java.net.InetSocketAddress; import java.util.UUID; import org.apache.qpid.qmf.QMFService; @@ -35,9 +34,9 @@ import org.apache.qpid.server.plugins.PluginManager; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager; import org.apache.qpid.server.security.auth.manager.AuthenticationManager; -import org.apache.qpid.server.transport.QpidAcceptor; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; +import org.apache.qpid.transport.network.NetworkTransport; public interface IApplicationRegistry { @@ -78,11 +77,9 @@ public interface IApplicationRegistry RootMessageLogger getRootMessageLogger(); /** - * Register any acceptors for this registry - * @param bindAddress The address that the acceptor has been bound with - * @param acceptor The acceptor in use + * Register any network transports for this registry */ - void addAcceptor(InetSocketAddress bindAddress, QpidAcceptor acceptor); + void registerTransport(int port, NetworkTransport transport); public UUID getBrokerId(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java index d2addfde0c..2f6d0cfa2e 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java @@ -35,6 +35,7 @@ import org.apache.qpid.server.logging.actors.GenericActor; import org.apache.qpid.server.logging.messages.ConnectionMessages; import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.AMQSessionModel; +import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.transport.Connection; import org.apache.qpid.transport.ExecutionErrorCode; @@ -48,10 +49,16 @@ public class ServerConnection extends Connection implements AMQConnectionModel, private Runnable _onOpenTask; private AtomicBoolean _logClosed = new AtomicBoolean(false); private LogActor _actor = GenericActor.getInstance(this); + private long _connectionId; - public ServerConnection() + public ServerConnection(long connectionId) { + _connectionId = connectionId; + } + public long getConnectionId() + { + return _connectionId; } @Override @@ -165,6 +172,9 @@ public class ServerConnection extends Connection implements AMQConnectionModel, CurrentActor.set(channelActor == null ? _actor : channelActor); } + + // Set thread credentials + SecurityManager.setThreadPrincipal(getAuthorizationID()); try { @@ -187,7 +197,7 @@ public class ServerConnection extends Connection implements AMQConnectionModel, MessageFormat.format(CONNECTION_FORMAT, getConnectionId(), getClientId(), - getConfig().getAddress(), + getConfig().getRemoteAddress().toString(), getVirtualHost().getName()) + "] "; } @@ -197,7 +207,7 @@ public class ServerConnection extends Connection implements AMQConnectionModel, MessageFormat.format(USER_FORMAT, getConnectionId(), getClientId(), - getConfig().getAddress()) + getConfig().getRemoteAddress().toString()) + "] "; } @@ -206,7 +216,7 @@ public class ServerConnection extends Connection implements AMQConnectionModel, return "[" + MessageFormat.format(SOCKET_FORMAT, getConnectionId(), - getConfig().getAddress()) + getConfig().getRemoteAddress().toString()) + "] "; } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java index 2b9e92f685..68724b2504 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java @@ -21,10 +21,6 @@ package org.apache.qpid.server.transport; import org.apache.qpid.transport.*; -import org.apache.qpid.server.logging.actors.CurrentActor; -import org.apache.qpid.server.logging.actors.GenericActor; -import org.apache.qpid.common.ClientProperties; -import org.apache.qpid.protocol.ProtocolEngine; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.registry.IApplicationRegistry; @@ -116,13 +112,11 @@ public class ServerConnectionDelegate extends ServerDelegate } vhost = _appRegistry.getVirtualHostRegistry().getVirtualHost(vhostName); - SecurityManager.setThreadPrincipal(conn.getAuthorizationID()); - if(vhost != null) { sconn.setVirtualHost(vhost); - if (!vhost.getSecurityManager().accessVirtualhost(vhostName, ((ProtocolEngine) sconn.getConfig()).getRemoteAddress())) + if (!vhost.getSecurityManager().accessVirtualhost(vhostName, sconn.getConfig().getRemoteAddress())) { sconn.invoke(new ConnectionClose(ConnectionCloseCode.CONNECTION_FORCED, "Permission denied '"+vhostName+"'")); sconn.setState(Connection.State.CLOSING); @@ -139,6 +133,29 @@ public class ServerConnectionDelegate extends ServerDelegate sconn.setState(Connection.State.CLOSING); } } + + @Override + public void connectionTuneOk(Connection conn, ConnectionTuneOk ok) + { + int okChannelMax = ok.getChannelMax(); + + if (okChannelMax > getChannelMax()) + { + ServerConnection sconn = (ServerConnection) conn; + _logger.error("Connection '" + sconn.getConnectionId() + "' from '" + + sconn.getConfig().getRemoteAddress() + "' being severed, " + + "client connectionTuneOk returned a channelMax (" + okChannelMax + + ") above the servers offered limit (" + getChannelMax() +")"); + + //Due to the error we must forcefully close the connection without negotiation + conn.getSender().close(); + return; + } + + //0 means no implied limit, except available server resources + //(or that forced by protocol limitations [0xFFFF]) + conn.setChannelMax(okChannelMax == 0 ? Connection.MAX_CHANNEL_MAX : okChannelMax); + } @Override protected int getHeartbeatMax() diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java index 540ad3fffd..2af6deee3d 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java @@ -26,7 +26,6 @@ import static org.apache.qpid.util.Serial.gt; import com.sun.security.auth.UserPrincipal; import org.apache.qpid.AMQException; -import org.apache.qpid.protocol.ProtocolEngine; import org.apache.qpid.server.configuration.ConfigStore; import org.apache.qpid.server.configuration.ConfiguredObject; import org.apache.qpid.server.configuration.ConnectionConfig; @@ -611,9 +610,9 @@ public class ServerSession extends Session implements PrincipalHolder, SessionCo { return "[" + MessageFormat.format(CHANNEL_FORMAT, - getConnection().getConnectionId(), + ((ServerConnection) getConnection()).getConnectionId(), getClientID(), - ((ProtocolEngine) _connectionConfig).getRemoteAddress().toString(), + getConnectionConfig().getRemoteAddress().toString(), getVirtualHost().getName(), getChannel()) + "] "; diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/RunBrokerWithCommand.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/RunBrokerWithCommand.java index 59543874b4..91b5e13168 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/RunBrokerWithCommand.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/RunBrokerWithCommand.java @@ -39,7 +39,7 @@ public class RunBrokerWithCommand // Override the first value which is the command we are going to run later. fudge[0] = "-v"; - new Main(fudge).startup(); + Main.main(fudge); } catch (Exception e) { diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java index 718874cf69..106239c583 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java @@ -23,24 +23,19 @@ package org.apache.qpid.server.configuration; import java.io.File; import java.io.FileWriter; import java.io.IOException; -import java.io.RandomAccessFile; import java.util.List; import java.util.Locale; -import junit.framework.TestCase; import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.configuration.XMLConfiguration; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.protocol.AMQProtocolEngine; -import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry; import org.apache.qpid.server.util.InternalBrokerBaseCase; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; -import org.apache.qpid.transport.TestNetworkDriver; public class ServerConfigurationTest extends InternalBrokerBaseCase { @@ -565,7 +560,7 @@ public class ServerConfigurationTest extends InternalBrokerBaseCase // Check default ServerConfiguration serverConfig = new ServerConfiguration(_config); serverConfig.initialise(); - assertEquals("wildcard", serverConfig.getBind()); + assertEquals("*", serverConfig.getBind()); // Check value we set _config.setProperty("connector.bind", "a"); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java index 3b6cd37ea9..36a2e054d3 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java @@ -34,10 +34,10 @@ import org.apache.qpid.server.output.ProtocolOutputConverter; import org.apache.qpid.server.message.AMQMessage; import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.state.AMQState; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.message.MessageContentSource; -import org.apache.qpid.transport.TestNetworkDriver; +import org.apache.qpid.transport.TestNetworkConnection; +import org.apache.qpid.transport.TestNetworkTransport; public class InternalTestProtocolSession extends AMQProtocolEngine implements ProtocolOutputConverter { @@ -47,7 +47,7 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr public InternalTestProtocolSession(VirtualHost virtualHost) throws AMQException { - super(ApplicationRegistry.getInstance().getVirtualHostRegistry(), new TestNetworkDriver()); + super(ApplicationRegistry.getInstance().getVirtualHostRegistry(), new TestNetworkTransport(), new TestNetworkConnection(), null, 0); _channelDelivers = new HashMap<Integer, Map<AMQShortString, LinkedList<DeliveryPair>>>(); diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/transport/ExistingSocketConnectorDemo.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/transport/ExistingSocketConnectorDemo.java index d7eb138523..aeeb011d38 100644 --- a/qpid/java/client/example/src/main/java/org/apache/qpid/example/transport/ExistingSocketConnectorDemo.java +++ b/qpid/java/client/example/src/main/java/org/apache/qpid/example/transport/ExistingSocketConnectorDemo.java @@ -21,9 +21,9 @@ package org.apache.qpid.example.transport; +import org.apache.mina.transport.socket.nio.ExistingSocketConnector; import org.apache.qpid.AMQException; import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.transport.TransportConnection; import org.apache.qpid.jms.ConnectionListener; import org.apache.qpid.url.URLSyntaxException; @@ -83,7 +83,7 @@ public class ExistingSocketConnectorDemo implements ConnectionListener Socket socket = SocketChannel.open().socket(); socket.connect(new InetSocketAddress("localhost", 5672)); - TransportConnection.registerOpenSocket(Socket1_ID, socket); + ExistingSocketConnector.registerOpenSocket(Socket1_ID, socket); _connection = new AMQConnection(CONNECTION); @@ -140,7 +140,7 @@ public class ExistingSocketConnectorDemo implements ConnectionListener socket.connect(new InetSocketAddress("localhost", 5673)); // This is the new method to pass in an open socket for the connection to use. - TransportConnection.registerOpenSocket(Socket2_ID, socket); + ExistingSocketConnector.registerOpenSocket(Socket2_ID, socket); } catch (IOException e) { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index ab59fee020..ca37c3c8af 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -61,6 +61,7 @@ import org.apache.qpid.client.failover.FailoverProtectedOperation; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.configuration.ClientProperties; import org.apache.qpid.exchange.ExchangeDefaults; +import org.apache.qpid.framing.AMQProtocolVersionException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicQosBody; import org.apache.qpid.framing.BasicQosOkBody; @@ -83,6 +84,15 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { private static final Logger _logger = LoggerFactory.getLogger(AMQConnection.class); + public static final String VERSION_0_8 = "0-8"; + public static final String VERSION_0_9 = "0-9"; + public static final String VERSION_0_9_1 = "0-9-1"; + public static final String VERSION_0_91 = "0-91"; + public static final String VERSION_0_10 = "0-10"; + + public static final String[] SUPPORTED_VERSIONS = new String[] { + VERSION_0_8, VERSION_0_9, VERSION_0_9_1, VERSION_0_91, VERSION_0_10 + }; /** * This is the "root" mutex that must be held when doing anything that could be impacted by failover. This must be @@ -318,27 +328,33 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect _useLegacyMapMessageFormat = Boolean.getBoolean(ClientProperties.USE_LEGACY_MAP_MESSAGE_FORMAT); } - String amqpVersion = System.getProperty((ClientProperties.AMQP_VERSION), "0-10"); + String amqpVersion = System.getProperty((ClientProperties.AMQP_VERSION), VERSION_0_10).trim(); _logger.debug("AMQP version " + amqpVersion); _failoverPolicy = new FailoverPolicy(connectionURL, this); BrokerDetails brokerDetails = _failoverPolicy.getCurrentBrokerDetails(); - if (brokerDetails.getTransport().equals(BrokerDetails.VM) || "0-8".equals(amqpVersion)) + + // Check AMQP version and set delegate accordingly + if (VERSION_0_8.equals(amqpVersion)) { _delegate = new AMQConnectionDelegate_8_0(this); } - else if ("0-9".equals(amqpVersion)) + else if (VERSION_0_9.equals(amqpVersion)) { _delegate = new AMQConnectionDelegate_0_9(this); } - else if ("0-91".equals(amqpVersion) || "0-9-1".equals(amqpVersion)) + else if (VERSION_0_9_1.equals(amqpVersion) || VERSION_0_91.equals(amqpVersion)) { _delegate = new AMQConnectionDelegate_9_1(this); } - else + else if (VERSION_0_10.equals(amqpVersion) || "".equals(amqpVersion)) { _delegate = new AMQConnectionDelegate_0_10(this); } + else + { + throw new AMQProtocolVersionException("AMQP version " + amqpVersion + " is not supported"); + } if (_logger.isInfoEnabled()) { @@ -1255,8 +1271,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { //Should never get here as all AMQEs are required to have an ErrorCode! // Other than AMQDisconnectedEx! - - if (cause instanceof AMQDisconnectedException) + if (cause instanceof AMQDisconnectedException && !getProtocolVersion().equals(ProtocolVersion.v0_10)) { Exception last = _protocolHandler.getStateManager().getLastException(); if (last != null) @@ -1287,7 +1302,10 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect _closing.set(false); closer = !_closed.getAndSet(true); - _protocolHandler.getProtocolSession().notifyError(je); + if (!getProtocolVersion().equals(ProtocolVersion.v0_10)) + { + _protocolHandler.getProtocolSession().notifyError(je); + } } // get the failover mutex before trying to close diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java index b0bd8f8e97..e5a3d1d9f8 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java @@ -20,7 +20,6 @@ package org.apache.qpid.client; * */ - import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; @@ -166,15 +165,15 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec { if (_logger.isDebugEnabled()) { - _logger.debug("connecting to host: " + brokerDetail.getHost() - + " port: " + brokerDetail.getPort() + " vhost: " - + _conn.getVirtualHost() + " username: " - + _conn.getUsername() + " password: " - + _conn.getPassword()); + _logger.debug("connecting to host: " + brokerDetail.getHost() + + " port: " + brokerDetail.getPort() + + " vhost: " + _conn.getVirtualHost() + + " username: " + _conn.getUsername() + + " password: " + _conn.getPassword() + + " transport: " + brokerDetail.getTransport()); } - ConnectionSettings conSettings = new ConnectionSettings(); - retriveConnectionSettings(conSettings,brokerDetail); + ConnectionSettings conSettings = retriveConnectionSettings(brokerDetail); _qpidConnection.connect(conSettings); _conn._connected = true; @@ -278,24 +277,9 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec _conn.getProtocolHandler().setFailoverLatch(null); } } - - ExceptionListener listener = _conn._exceptionListener; - if (listener == null) - { - _logger.error("connection exception: " + conn, exc); - } else { - String code = null; - if (close != null) - { - code = close.getReplyCode().toString(); - } - - JMSException ex = new JMSException(exc.getMessage(), code); - ex.setLinkedException(exc); - ex.initCause(exc); - listener.onException(ex); + _conn.exceptionReceived(exc); } } @@ -327,15 +311,16 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec return ProtocolVersion.v0_10; } - private void retriveConnectionSettings(ConnectionSettings conSettings, BrokerDetails brokerDetail) + private ConnectionSettings retriveConnectionSettings(BrokerDetails brokerDetail) { - + ConnectionSettings conSettings = new ConnectionSettings(); conSettings.setHost(brokerDetail.getHost()); conSettings.setPort(brokerDetail.getPort()); conSettings.setVhost(_conn.getVirtualHost()); conSettings.setUsername(_conn.getUsername()); conSettings.setPassword(_conn.getPassword()); - + conSettings.setProtocol(brokerDetail.getTransport()); + // ------------ sasl options --------------- if (brokerDetail.getProperty(BrokerDetails.OPTIONS_SASL_MECHS) != null) { @@ -417,6 +402,8 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec } conSettings.setHeartbeatInterval(getHeartbeatInterval(brokerDetail)); + + return conSettings; } // The idle_timeout prop is in milisecs while diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java index 40b332d216..266785aea7 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java @@ -39,7 +39,6 @@ import org.apache.qpid.client.failover.FailoverRetrySupport; import org.apache.qpid.client.protocol.AMQProtocolSession; import org.apache.qpid.client.state.AMQState; import org.apache.qpid.client.state.StateWaiter; -import org.apache.qpid.client.transport.TransportConnection; import org.apache.qpid.framing.BasicQosBody; import org.apache.qpid.framing.BasicQosOkBody; import org.apache.qpid.framing.ChannelOpenBody; @@ -49,6 +48,11 @@ import org.apache.qpid.framing.TxSelectBody; import org.apache.qpid.framing.TxSelectOkBody; import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.jms.ChannelLimitReachedException; +import org.apache.qpid.ssl.SSLContextFactory; +import org.apache.qpid.transport.ConnectionSettings; +import org.apache.qpid.transport.network.NetworkConnection; +import org.apache.qpid.transport.network.OutgoingNetworkTransport; +import org.apache.qpid.transport.network.Transport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -86,22 +90,27 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate final Set<AMQState> openOrClosedStates = EnumSet.of(AMQState.CONNECTION_OPEN, AMQState.CONNECTION_CLOSED); - StateWaiter waiter = _conn._protocolHandler.createWaiter(openOrClosedStates); - // TODO: use system property thingy for this - if (System.getProperty("UseTransportIo", "false").equals("false")) - { - TransportConnection.getInstance(brokerDetail).connect(_conn._protocolHandler, brokerDetail); - } - else + ConnectionSettings settings = new ConnectionSettings(); + settings.setHost(brokerDetail.getHost()); + settings.setPort(brokerDetail.getPort()); + settings.setProtocol(brokerDetail.getTransport()); + + SSLConfiguration sslConfig = _conn.getSSLConfiguration(); + SSLContextFactory sslFactory = null; + if (sslConfig != null) { - _conn.getProtocolHandler().createIoTransportSession(brokerDetail); + sslFactory = new SSLContextFactory(sslConfig.getKeystorePath(), sslConfig.getKeystorePassword(), sslConfig.getCertType()); } + + OutgoingNetworkTransport transport = Transport.getOutgoingTransport(settings.getProtocol()); + NetworkConnection network = transport.connect(settings, _conn._protocolHandler, sslFactory); + _conn._protocolHandler.connect(transport, network); _conn._protocolHandler.getProtocolSession().init(); + // this blocks until the connection has been set up or when an error // has prevented the connection being set up - AMQState state = waiter.await(); if(state == AMQState.CONNECTION_OPEN) diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 517a7a5ce8..4480ab8660 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -32,11 +32,13 @@ import java.util.Map; import java.util.Timer; import java.util.TimerTask; import java.util.UUID; +import java.util.concurrent.atomic.AtomicReference; import javax.jms.Destination; import javax.jms.JMSException; import org.apache.qpid.AMQException; +import org.apache.qpid.AMQTimeoutException; import org.apache.qpid.client.AMQDestination.AddressOption; import org.apache.qpid.client.AMQDestination.Binding; import org.apache.qpid.client.AMQDestination.DestSyntax; @@ -69,6 +71,7 @@ import org.apache.qpid.transport.RangeSet; import org.apache.qpid.transport.Session; import org.apache.qpid.transport.SessionException; import org.apache.qpid.transport.SessionListener; +import org.apache.qpid.transport.SessionTimeoutException; import org.apache.qpid.util.Serial; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -97,7 +100,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic public void run() { AMQSession_0_10 ssn = session.get(); - if (ssn == null) + if (ssn == null || ssn.getQpidSession().isClosing()) { cancel(); } @@ -124,8 +127,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic /** * The latest qpid Exception that has been raised. */ - private Object _currentExceptionLock = new Object(); - private AMQException _currentException; + private AtomicReference<AMQException> _currentException = new AtomicReference<AMQException>(null); // a ref on the qpid connection protected org.apache.qpid.transport.Connection _qpidConnection; @@ -871,19 +873,11 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic /** * Get the latest thrown exception. * - * @throws SessionException get the latest thrown error. + * @return the latest thrown {@link AMQException} */ public AMQException getCurrentException() { - AMQException amqe = null; - synchronized (_currentExceptionLock) - { - if (_currentException != null) - { - amqe = _currentException; - _currentException = null; - } - } + AMQException amqe = _currentException.getAndSet(null); return amqe; } @@ -1011,7 +1005,12 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic public void setCurrentException(SessionException se) { - synchronized (_currentExceptionLock) + AMQException amqe; + if (se instanceof SessionTimeoutException) + { + amqe = new AMQTimeoutException(se.getMessage(), se); + } + else { ExecutionException ee = se.getException(); int code = AMQConstant.INTERNAL_ERROR.getCode(); @@ -1019,12 +1018,12 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic { code = ee.getErrorCode().getValue(); } - AMQException amqe = new AMQException(AMQConstant.getConstant(code), se.getMessage(), se.getCause()); + amqe = new AMQException(AMQConstant.getConstant(code), se.getMessage(), se.getCause()); + } - _connection.exceptionReceived(amqe); + _currentException.set(amqe); - _currentException = amqe; - } + _connection.exceptionReceived(amqe); } public AMQMessageDelegateFactory getMessageDelegateFactory() diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index eb5af119b2..bf520e64ba 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -63,10 +63,11 @@ import org.apache.qpid.pool.ReferenceCountingExecutorService; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.protocol.AMQMethodListener; -import org.apache.qpid.protocol.ProtocolEngine; import org.apache.qpid.thread.Threading; -import org.apache.qpid.transport.NetworkDriver; -import org.apache.qpid.transport.network.io.IoTransport; +import org.apache.qpid.transport.Receiver; +import org.apache.qpid.transport.Sender; +import org.apache.qpid.transport.network.NetworkConnection; +import org.apache.qpid.transport.network.NetworkTransport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -120,7 +121,7 @@ import org.slf4j.LoggerFactory; * held per protocol handler, per protocol session, per network connection, per channel, in separate classes, so * that lifecycles of the fields match lifecycles of their containing objects. */ -public class AMQProtocolHandler implements ProtocolEngine +public class AMQProtocolHandler implements Receiver<java.nio.ByteBuffer> { /** Used for debugging. */ private static final Logger _logger = LoggerFactory.getLogger(AMQProtocolHandler.class); @@ -172,7 +173,9 @@ public class AMQProtocolHandler implements ProtocolEngine private Job _readJob; private Job _writeJob; private ReferenceCountingExecutorService _poolReference = ReferenceCountingExecutorService.getInstance(); - private NetworkDriver _networkDriver; + private Sender<ByteBuffer> _sender; + private NetworkConnection _network; + private NetworkTransport _transport; private ProtocolVersion _suggestedProtocolVersion; private long _writtenBytes; @@ -211,21 +214,6 @@ public class AMQProtocolHandler implements ProtocolEngine } /** - * Called when we want to create a new IoTransport session - * @param brokerDetail - */ - public void createIoTransportSession(BrokerDetails brokerDetail) - { - _protocolSession = new AMQProtocolSession(this, _connection); - _stateManager.setProtocolSession(_protocolSession); - IoTransport.connect_0_9(getProtocolSession(), - brokerDetail.getHost(), - brokerDetail.getPort(), - brokerDetail.getBooleanProperty(BrokerDetails.OPTIONS_SSL)); - _protocolSession.init(); - } - - /** * Called when the network connection is closed. This can happen, either because the client explicitly requested * that the connection be closed, in which case nothing is done, or because the connection died. In the case * where the connection died, an attempt to failover automatically to a new connection may be started. The failover @@ -315,7 +303,7 @@ public class AMQProtocolHandler implements ProtocolEngine // failover: HeartbeatDiagnostics.timeout(); _logger.warn("Timed out while waiting for heartbeat from peer."); - _networkDriver.close(); + _sender.close(); } public void writerIdle() @@ -337,7 +325,7 @@ public class AMQProtocolHandler implements ProtocolEngine { _logger.info("Exception caught therefore going to attempt failover: " + cause, cause); // this will attempt failover - _networkDriver.close(); + _sender.close(); closed(); } else @@ -589,7 +577,7 @@ public class AMQProtocolHandler implements ProtocolEngine { public void run() { - _networkDriver.send(buf); + _sender.send(buf); } }); if (PROTOCOL_DEBUG) @@ -610,7 +598,7 @@ public class AMQProtocolHandler implements ProtocolEngine if (wait) { - _networkDriver.flush(); + _sender.flush(); } } @@ -724,7 +712,7 @@ public class AMQProtocolHandler implements ProtocolEngine try { syncWrite(frame, ConnectionCloseOkBody.class, timeout); - _networkDriver.close(); + _sender.close(); closed(); } catch (AMQTimeoutException e) @@ -735,6 +723,10 @@ public class AMQProtocolHandler implements ProtocolEngine { _logger.debug("FailoverException interrupted connection close, ignoring as connection close anyway."); } + finally + { + _network.close(); + } } _poolReference.releaseExecutorService(); } @@ -844,17 +836,19 @@ public class AMQProtocolHandler implements ProtocolEngine public SocketAddress getRemoteAddress() { - return _networkDriver.getRemoteAddress(); + return _network.getRemoteAddress(); } public SocketAddress getLocalAddress() { - return _networkDriver.getLocalAddress(); + return _network.getLocalAddress(); } - public void setNetworkDriver(NetworkDriver driver) + public void connect(NetworkTransport transport, NetworkConnection network) { - _networkDriver = driver; + _transport = transport; + _network = network; + _sender = network.getSender(); } /** @param delay delay in seconds (not ms) */ @@ -862,20 +856,15 @@ public class AMQProtocolHandler implements ProtocolEngine { if (delay > 0) { - getNetworkDriver().setMaxWriteIdle(delay); - getNetworkDriver().setMaxReadIdle(HeartbeatConfig.CONFIG.getTimeout(delay)); +// FIXME +// _sender.setMaxWriteIdle(delay); +// _sender.setMaxReadIdle(HeartbeatConfig.CONFIG.getTimeout(delay)); HeartbeatDiagnostics.init(delay, HeartbeatConfig.CONFIG.getTimeout(delay)); } } - public NetworkDriver getNetworkDriver() - { - return _networkDriver; - } - public ProtocolVersion getSuggestedProtocolVersion() { return _suggestedProtocolVersion; } - } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/AMQNoTransportForProtocolException.java b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/AMQNoTransportForProtocolException.java deleted file mode 100644 index 6e47e2ce28..0000000000 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/AMQNoTransportForProtocolException.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.client.transport; - -import org.apache.qpid.jms.BrokerDetails; - -/** - * AMQNoTransportForProtocolException represents a connection failure where there is no transport medium to connect - * to the broker available. This may be the case if their is a error in the connection url, or an unsupported transport - * type is specified. - * - * <p/><table id="crc"><caption>CRC Card</caption> - * <tr><th> Responsibilities <th> Collaborations - * <tr><td> Represent absence of a transport medium. - * </table> - * - * @todo Error code never used. This is not an AMQException. - */ -public class AMQNoTransportForProtocolException extends AMQTransportConnectionException -{ - BrokerDetails _details; - - public AMQNoTransportForProtocolException(BrokerDetails details, String message, Throwable cause) - { - super(null, message, cause); - - _details = details; - } - - public String toString() - { - if (_details != null) - { - return super.toString() + _details.toString(); - } - else - { - return super.toString(); - } - } -} diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java deleted file mode 100644 index 1ac8f62e32..0000000000 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.client.transport; - -import java.io.IOException; -import java.net.InetSocketAddress; - -import org.apache.mina.common.ByteBuffer; -import org.apache.mina.common.IoConnector; -import org.apache.mina.common.SimpleByteBufferAllocator; -import org.apache.qpid.client.SSLConfiguration; -import org.apache.qpid.client.protocol.AMQProtocolHandler; -import org.apache.qpid.jms.BrokerDetails; -import org.apache.qpid.ssl.SSLContextFactory; -import org.apache.qpid.transport.network.mina.MINANetworkDriver; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class SocketTransportConnection implements ITransportConnection -{ - private static final Logger _logger = LoggerFactory.getLogger(SocketTransportConnection.class); - private static final int DEFAULT_BUFFER_SIZE = 32 * 1024; - - private SocketConnectorFactory _socketConnectorFactory; - - static interface SocketConnectorFactory - { - IoConnector newSocketConnector(); - } - - public SocketTransportConnection(SocketConnectorFactory socketConnectorFactory) - { - _socketConnectorFactory = socketConnectorFactory; - } - - public void connect(AMQProtocolHandler protocolHandler, BrokerDetails brokerDetail) throws IOException - { - ByteBuffer.setUseDirectBuffers(Boolean.getBoolean("amqj.enableDirectBuffers")); - - // the MINA default is currently to use the pooled allocator although this may change in future - // once more testing of the performance of the simple allocator has been done - if (!Boolean.getBoolean("amqj.enablePooledAllocator")) - { - _logger.info("Using SimpleByteBufferAllocator"); - ByteBuffer.setAllocator(new SimpleByteBufferAllocator()); - } - - final IoConnector ioConnector = _socketConnectorFactory.newSocketConnector(); - final InetSocketAddress address; - - if (brokerDetail.getTransport().equals(BrokerDetails.SOCKET)) - { - address = null; - } - else - { - address = new InetSocketAddress(brokerDetail.getHost(), brokerDetail.getPort()); - _logger.info("Attempting connection to " + address); - } - - SSLConfiguration sslConfig = protocolHandler.getConnection().getSSLConfiguration(); - SSLContextFactory sslFactory = null; - if (sslConfig != null) - { - sslFactory = new SSLContextFactory(sslConfig.getKeystorePath(), sslConfig.getKeystorePassword(), sslConfig.getCertType()); - } - - MINANetworkDriver driver = new MINANetworkDriver(ioConnector); - driver.open(brokerDetail.getPort(), address.getAddress(), protocolHandler, null, sslFactory); - protocolHandler.setNetworkDriver(driver); - } -} diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java deleted file mode 100644 index aef3a563af..0000000000 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java +++ /dev/null @@ -1,351 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.client.transport; - -import java.io.IOException; -import java.net.Socket; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -import org.apache.mina.common.IoConnector; -import org.apache.mina.common.IoHandlerAdapter; -import org.apache.mina.common.IoServiceConfig; -import org.apache.mina.transport.socket.nio.ExistingSocketConnector; -import org.apache.mina.transport.socket.nio.MultiThreadSocketConnector; -import org.apache.mina.transport.socket.nio.SocketConnector; -import org.apache.mina.transport.vmpipe.VmPipeAcceptor; -import org.apache.mina.transport.vmpipe.VmPipeAddress; -import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException; -import org.apache.qpid.jms.BrokerDetails; -import org.apache.qpid.protocol.ProtocolEngineFactory; -import org.apache.qpid.thread.QpidThreadExecutor; -import org.apache.qpid.transport.network.mina.MINANetworkDriver; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * The TransportConnection is a helper class responsible for connecting to an AMQ server. It sets up the underlying - * connector, which currently always uses TCP/IP sockets. It creates the "protocol handler" which deals with MINA - * protocol events. <p/> Could be extended in future to support different transport types by turning this into concrete - * class/interface combo. - */ -public class TransportConnection -{ - private static ITransportConnection _instance; - - private static final Map _inVmPipeAddress = new HashMap(); - private static VmPipeAcceptor _acceptor; - private static int _currentInstance = -1; - private static int _currentVMPort = -1; - - private static final int TCP = 0; - private static final int VM = 1; - private static final int SOCKET = 2; - - private static Logger _logger = LoggerFactory.getLogger(TransportConnection.class); - - private static final String DEFAULT_QPID_SERVER = "org.apache.qpid.server.protocol.AMQProtocolEngineFactory"; - - private static Map<String, Socket> _openSocketRegister = new ConcurrentHashMap<String, Socket>(); - - public static void registerOpenSocket(String socketID, Socket openSocket) - { - _openSocketRegister.put(socketID, openSocket); - } - - public static Socket removeOpenSocket(String socketID) - { - return _openSocketRegister.remove(socketID); - } - - public static synchronized ITransportConnection getInstance(final BrokerDetails details) throws AMQTransportConnectionException - { - int transport = getTransport(details.getTransport()); - - if (transport == -1) - { - throw new AMQNoTransportForProtocolException(details, null, null); - } - - switch (transport) - { - case SOCKET: - return new SocketTransportConnection(new SocketTransportConnection.SocketConnectorFactory() - { - public IoConnector newSocketConnector() - { - ExistingSocketConnector connector = new ExistingSocketConnector(1,new QpidThreadExecutor()); - - Socket socket = TransportConnection.removeOpenSocket(details.getHost()); - - if (socket != null) - { - _logger.info("Using existing Socket:" + socket); - - ((ExistingSocketConnector) connector).setOpenSocket(socket); - } - else - { - throw new IllegalArgumentException("Active Socket must be provided for broker " + - "with 'socket://<SocketID>' transport:" + details); - } - return connector; - } - }); - case TCP: - return new SocketTransportConnection(new SocketTransportConnection.SocketConnectorFactory() - { - public IoConnector newSocketConnector() - { - SocketConnector result; - // FIXME - this needs to be sorted to use the new Mina MultiThread SA. - if (Boolean.getBoolean("qpidnio")) - { - _logger.warn("Using Qpid MultiThreaded NIO - " + (System.getProperties().containsKey("qpidnio") - ? "Qpid NIO is new default" - : "Sysproperty 'qpidnio' is set")); - result = new MultiThreadSocketConnector(1, new QpidThreadExecutor()); - } - else - { - _logger.info("Using Mina NIO"); - result = new SocketConnector(1, new QpidThreadExecutor()); // non-blocking connector - } - // Don't have the connector's worker thread wait around for other connections (we only use - // one SocketConnector per connection at the moment anyway). This allows short-running - // clients (like unit tests) to complete quickly. - result.setWorkerTimeout(0); - return result; - } - }); - case VM: - { - return getVMTransport(details, Boolean.getBoolean("amqj.AutoCreateVMBroker")); - } - default: - throw new AMQNoTransportForProtocolException(details, "Transport not recognised:" + transport, null); - } - } - - private static int getTransport(String transport) - { - if (transport.equals(BrokerDetails.SOCKET)) - { - return SOCKET; - } - - if (transport.equals(BrokerDetails.TCP)) - { - return TCP; - } - - if (transport.equals(BrokerDetails.VM)) - { - return VM; - } - - return -1; - } - - private static ITransportConnection getVMTransport(BrokerDetails details, boolean AutoCreate) - throws AMQVMBrokerCreationException - { - int port = details.getPort(); - - synchronized (_inVmPipeAddress) - { - if (!_inVmPipeAddress.containsKey(port)) - { - if (AutoCreate) - { - _logger.warn("Auto Creating InVM Broker on port:" + port); - createVMBroker(port); - } - else - { - throw new AMQVMBrokerCreationException(null, port, "VM Broker on port " + port - + " does not exist. Auto create disabled.", null); - } - } - } - - return new VmPipeTransportConnection(port); - } - - public static void createVMBroker(int port) throws AMQVMBrokerCreationException - { - synchronized(TransportConnection.class) - { - if (_acceptor == null) - { - _acceptor = new VmPipeAcceptor(); - - IoServiceConfig config = _acceptor.getDefaultConfig(); - } - } - synchronized (_inVmPipeAddress) - { - - if (!_inVmPipeAddress.containsKey(port)) - { - _logger.info("Creating InVM Qpid.AMQP listening on port " + port); - IoHandlerAdapter provider = null; - try - { - VmPipeAddress pipe = new VmPipeAddress(port); - - provider = createBrokerInstance(port); - - _acceptor.bind(pipe, provider); - - _inVmPipeAddress.put(port, pipe); - _logger.info("Created InVM Qpid.AMQP listening on port " + port); - } - catch (IOException e) - { - _logger.error("Got IOException.", e); - - // Try and unbind provider - try - { - VmPipeAddress pipe = new VmPipeAddress(port); - - try - { - _acceptor.unbind(pipe); - } - catch (Exception ignore) - { - // ignore - } - - if (provider == null) - { - provider = createBrokerInstance(port); - } - - _acceptor.bind(pipe, provider); - _inVmPipeAddress.put(port, pipe); - _logger.info("Created InVM Qpid.AMQP listening on port " + port); - } - catch (IOException justUseFirstException) - { - String because; - if (e.getCause() == null) - { - because = e.toString(); - } - else - { - because = e.getCause().toString(); - } - - throw new AMQVMBrokerCreationException(null, port, because + " Stopped binding of InVM Qpid.AMQP", e); - } - } - - } - else - { - _logger.info("InVM Qpid.AMQP on port " + port + " already exits."); - } - } - } - - private static IoHandlerAdapter createBrokerInstance(int port) throws AMQVMBrokerCreationException - { - String protocolProviderClass = System.getProperty("amqj.protocolprovider.class", DEFAULT_QPID_SERVER); - _logger.info("Creating Qpid protocol provider: " + protocolProviderClass); - - // can't use introspection to get Provider as it is a server class. - // need to go straight to IoHandlerAdapter but that requries the queues and exchange from the ApplicationRegistry which we can't access. - - // get right constructor and pass in instancec ID - "port" - IoHandlerAdapter provider; - try - { - Class[] cnstr = {Integer.class}; - Object[] params = {port}; - - provider = new MINANetworkDriver(); - ProtocolEngineFactory engineFactory = (ProtocolEngineFactory) Class.forName(protocolProviderClass).getConstructor(cnstr).newInstance(params); - ((MINANetworkDriver) provider).setProtocolEngineFactory(engineFactory, true); - // Give the broker a second to create - _logger.info("Created VMBroker Instance:" + port); - } - catch (Exception e) - { - _logger.info("Unable to create InVM Qpid.AMQP on port " + port + ". Because: " + e.getCause()); - String because; - if (e.getCause() == null) - { - because = e.toString(); - } - else - { - because = e.getCause().toString(); - } - - AMQVMBrokerCreationException amqbce = - new AMQVMBrokerCreationException(null, port, because + " Stopped InVM Qpid.AMQP creation", e); - throw amqbce; - } - - return provider; - } - - public static void killAllVMBrokers() - { - _logger.info("Killing all VM Brokers"); - synchronized(TransportConnection.class) - { - if (_acceptor != null) - { - _acceptor.unbindAll(); - } - synchronized (_inVmPipeAddress) - { - _inVmPipeAddress.clear(); - } - _acceptor = null; - } - _currentInstance = -1; - _currentVMPort = -1; - } - - public static void killVMBroker(int port) - { - synchronized (_inVmPipeAddress) - { - VmPipeAddress pipe = (VmPipeAddress) _inVmPipeAddress.get(port); - if (pipe != null) - { - _logger.info("Killing VM Broker:" + port); - _inVmPipeAddress.remove(port); - // This does need to be sychronized as otherwise mina can hang - // if a new connection is made - _acceptor.unbind(pipe); - } - } - } - -} diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java deleted file mode 100644 index 87cc2e7a5a..0000000000 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.client.transport; - -import java.io.IOException; - -import org.apache.mina.common.ConnectFuture; -import org.apache.mina.transport.vmpipe.QpidVmPipeConnector; -import org.apache.mina.transport.vmpipe.VmPipeAddress; -import org.apache.mina.transport.vmpipe.VmPipeConnector; -import org.apache.qpid.client.protocol.AMQProtocolHandler; -import org.apache.qpid.jms.BrokerDetails; -import org.apache.qpid.transport.network.mina.MINANetworkDriver; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class VmPipeTransportConnection implements ITransportConnection -{ - private static final Logger _logger = LoggerFactory.getLogger(VmPipeTransportConnection.class); - - private int _port; - - private MINANetworkDriver _networkDriver; - - public VmPipeTransportConnection(int port) - { - _port = port; - } - - public void connect(AMQProtocolHandler protocolHandler, BrokerDetails brokerDetail) throws IOException - { - final VmPipeConnector ioConnector = new QpidVmPipeConnector(); - - final VmPipeAddress address = new VmPipeAddress(_port); - _logger.info("Attempting connection to " + address); - _networkDriver = new MINANetworkDriver(ioConnector, protocolHandler); - protocolHandler.setNetworkDriver(_networkDriver); - ConnectFuture future = ioConnector.connect(address, _networkDriver); - // wait for connection to complete - future.join(); - // we call getSession which throws an IOException if there has been an error connecting - future.getSession(); - _networkDriver.setProtocolEngine(protocolHandler); - } -} diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/vmbroker/AMQVMBrokerCreationException.java b/qpid/java/client/src/main/java/org/apache/qpid/client/vmbroker/AMQVMBrokerCreationException.java deleted file mode 100644 index dc0d9b8c78..0000000000 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/vmbroker/AMQVMBrokerCreationException.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.client.vmbroker; - -import org.apache.qpid.client.transport.AMQTransportConnectionException; -import org.apache.qpid.protocol.AMQConstant; - -/** - * AMQVMBrokerCreationException represents failure to create an in VM broker on the vm transport medium. - * - * <p/><table id="crc"><caption>CRC Card</caption> - * <tr><th> Responsibilities <th> Collaborations - * <tr><td> Represent failure to create an in VM broker. - * </table> - * - * @todo Error code never used. This is not an AMQException. - */ -public class AMQVMBrokerCreationException extends AMQTransportConnectionException -{ - private int _port; - - /** - * @param port - * - * @deprecated - */ - public AMQVMBrokerCreationException(int port) - { - this(null, port, "Unable to create vm broker", null); - } - - public AMQVMBrokerCreationException(AMQConstant errorCode, int port, String message, Throwable cause) - { - super(errorCode, message, cause); - _port = port; - } - - public String toString() - { - return super.toString() + " on port " + _port; - } -} diff --git a/qpid/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java b/qpid/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java index f520a21ba0..f0912a69f2 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java @@ -27,7 +27,8 @@ import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.framing.amqp_8_0.BasicRecoverOkBodyImpl; import org.apache.qpid.AMQException; import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.transport.TestNetworkDriver; +import org.apache.qpid.transport.TestNetworkConnection; +import org.apache.qpid.transport.TestNetworkTransport; import org.apache.qpid.client.MockAMQConnection; import org.apache.qpid.client.AMQAuthenticationException; import org.apache.qpid.client.state.AMQState; @@ -73,7 +74,7 @@ public class AMQProtocolHandlerTest extends TestCase { //Create a new ProtocolHandler with a fake connection. _handler = new AMQProtocolHandler(new MockAMQConnection("amqp://guest:guest@client/test?brokerlist='vm://:1'")); - _handler.setNetworkDriver(new TestNetworkDriver()); + _handler.connect(new TestNetworkTransport(), new TestNetworkConnection()); AMQBody body = BasicRecoverOkBodyImpl.getFactory().newInstance(null, 1); _blockFrame = new AMQFrame(0, body); diff --git a/qpid/java/client/src/main/java/org/apache/mina/transport/socket/nio/ExistingSocketConnector.java b/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/ExistingSocketConnector.java index 98716c0c3c..db956c2381 100644 --- a/qpid/java/client/src/main/java/org/apache/mina/transport/socket/nio/ExistingSocketConnector.java +++ b/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/ExistingSocketConnector.java @@ -19,6 +19,7 @@ */ package org.apache.mina.transport.socket.nio; +import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap; import edu.emory.mathcs.backport.java.util.concurrent.Executor; import org.apache.mina.common.ConnectFuture; import org.apache.mina.common.ExceptionMonitor; @@ -28,6 +29,9 @@ import org.apache.mina.common.IoHandler; import org.apache.mina.common.IoServiceConfig; import org.apache.mina.common.support.BaseIoConnector; import org.apache.mina.common.support.DefaultConnectFuture; +import org.apache.mina.transport.socket.nio.SocketConnectorConfig; +import org.apache.mina.transport.socket.nio.SocketIoProcessor; +import org.apache.mina.transport.socket.nio.SocketSessionImpl; import org.apache.mina.util.NamePreservingRunnable; import org.apache.mina.util.NewThreadExecutor; import org.apache.mina.util.Queue; @@ -40,6 +44,7 @@ import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Iterator; +import java.util.Map; import java.util.Set; /** @@ -50,6 +55,8 @@ import java.util.Set; */ public class ExistingSocketConnector extends BaseIoConnector { + private static final Map<String, Socket> OPEN_SOCKET_REGISTER = new ConcurrentHashMap(); + /** @noinspection StaticNonFinalField */ private static volatile int nextId = 0; @@ -69,6 +76,16 @@ public class ExistingSocketConnector extends BaseIoConnector private int workerTimeout = 60; // 1 min. private Socket _openSocket = null; + public static void registerOpenSocket(String socketID, Socket openSocket) + { + OPEN_SOCKET_REGISTER.put(socketID, openSocket); + } + + public static Socket removeOpenSocket(String socketID) + { + return OPEN_SOCKET_REGISTER.remove(socketID); + } + /** Create a connector with a single processing thread using a NewThreadExecutor */ public ExistingSocketConnector() { diff --git a/qpid/java/common/src/main/java/org/apache/qpid/BrokerOptions.java b/qpid/java/common/src/main/java/org/apache/qpid/BrokerOptions.java new file mode 100644 index 0000000000..2cfac85abb --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/BrokerOptions.java @@ -0,0 +1,229 @@ +package org.apache.qpid; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; + +public class BrokerOptions extends HashMap<String, List<String>> +{ + /** serialVersionUID */ + private static final long serialVersionUID = 8051825964945442234L; + + public static final String DEFAULT_CONFIG_FILE = "etc/config.xml"; + public static final String DEFAULT_LOG_CONFIG_FILENAME = "log4j.xml"; + public static final String QPID_HOME = "QPID_HOME"; + + public static final String PORTS = "p"; + public static final String EXCLUDE_0_10 = "exclude-0-10"; + public static final String EXCLUDE_0_9_1 = "exclude-0-9-1"; + public static final String EXCLUDE_0_9 = "exclude-0-9"; + public static final String EXCLUDE_0_8 = "exclude-0-8"; + public static final String BIND = "b"; + public static final String MANAGEMENT = "m"; + public static final String LOG4J = "l"; + public static final String WATCH = "w"; + public static final String CONFIG = "c"; + public static final String PROTOCOL = "protocol"; + + public static final String[] COMMAND_LINE_OPTIONS = new String[] { + PORTS, EXCLUDE_0_10, EXCLUDE_0_9_1, EXCLUDE_0_9, EXCLUDE_0_8, + BIND, MANAGEMENT, LOG4J, WATCH, CONFIG, + }; + + public void setPorts(Integer...ports) + { + put(PORTS, ports); + } + + public List<Integer> getPorts() + { + return getList(PORTS); + } + + public void setExclude_0_10Ports(Integer...ports) + { + put(EXCLUDE_0_10, ports); + } + + public List<Integer> getExclude_0_10Ports() + { + return getList(EXCLUDE_0_10); + } + + public void setExclude_0_9_1Ports(Integer...ports) + { + put(EXCLUDE_0_9_1, ports); + } + + public List<Integer> getExclude_0_9_1Ports() + { + return getList(EXCLUDE_0_9_1); + } + + public void setExclude_0_9Ports(Integer...ports) + { + put(EXCLUDE_0_9, ports); + } + + public List<Integer> getExclude_0_9Ports() + { + return getList(EXCLUDE_0_9); + } + + public void setExclude_0_8Ports(Integer...ports) + { + put(EXCLUDE_0_8, ports); + } + + public List<Integer> getExclude_0_8Ports() + { + return getList(EXCLUDE_0_8); + } + + public void setManagementPort(Integer management) + { + put(MANAGEMENT, Integer.toString(management)); + } + + public Integer getManagementPort() + { + return getInteger(MANAGEMENT); + } + + public void setBind(String bind) + { + put(BIND, bind); + } + + public String getBind() + { + return getValue(BIND); + } + + public void setLog4JFile(String log4j) + { + put(LOG4J, log4j); + } + + public String getLog4JFile() + { + return getValue(LOG4J); + } + + public void setLog4JWatch(Integer watch) + { + put(WATCH, Integer.toString(watch)); + } + + public Integer getLog4JWatch() + { + return getInteger(WATCH); + } + + public void setConfigFile(String config) + { + put(CONFIG, config); + } + + public String getConfigFile() + { + return getValue(CONFIG); + } + + public void setProtocol(String protocol) + { + put(PROTOCOL, protocol); + } + + public String getProtocol() + { + return getValue(PROTOCOL); + } + + public void put(String key, String value) + { + if (value != null) + { + put(key, Collections.singletonList(value)); + } + } + + public void put(String key, String...values) + { + if (values != null) + { + put(key, Arrays.asList(values)); + } + } + + public void put(String key, Integer...values) + { + List<String> list = new ArrayList<String>(); + for (Integer i : values) + { + list.add(Integer.toString(i)); + } + put(key, list); + } + + public Integer getInteger(Object key) + { + return getInteger(key, null); + } + + public Integer getInteger(Object key, Integer defaultValue) + { + if (!containsKey(key)) + { + return defaultValue; + } + List<String> values = get(key); + return Integer.valueOf(values.get(0)); + } + + public List<Integer> getList(Object key) + { + return getList(key, null); + } + + public List<Integer> getList(Object key, List<Integer> defaultValues) + { + if (!containsKey(key)) + { + return defaultValues; + } + List<String> list = get(key); + List<Integer> values = new ArrayList<Integer>(); + for (String s : list) + { + values.add(Integer.valueOf(s)); + } + return values; + } + + public String getValue(Object key) + { + return getValue(key, null); + } + + public String getValue(Object key, String defaultValue) + { + if (!containsKey(key)) + { + return defaultValue; + } + List<String> values = get(key); + return values.get(0); + } + + public List<String> get(Object key, List<String> defaultValues) + { + if (!containsKey(key)) + { + return defaultValues; + } + return get(key); + } +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/ToyBroker.java b/qpid/java/common/src/main/java/org/apache/qpid/ToyBroker.java deleted file mode 100644 index 5423bbb68f..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpid/ToyBroker.java +++ /dev/null @@ -1,208 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid; - -import org.apache.qpid.transport.*; -import org.apache.qpid.transport.network.mina.MinaHandler; - -import static org.apache.qpid.transport.util.Functions.str; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.LinkedBlockingQueue; - - -/** - * ToyBroker - * - * @author Rafael H. Schloming - */ - -class ToyBroker extends SessionDelegate -{ - - private ToyExchange exchange; - private Map<String,Consumer> consumers = new ConcurrentHashMap<String,Consumer>(); - - public ToyBroker(ToyExchange exchange) - { - this.exchange = exchange; - } - - public void messageAcquire(Session context, MessageAcquire struct) - { - System.out.println("\n==================> messageAcquire " ); - context.executionResult((int) struct.getId(), new Acquired(struct.getTransfers())); - } - - @Override public void queueDeclare(Session ssn, QueueDeclare qd) - { - exchange.createQueue(qd.getQueue()); - System.out.println("\n==================> declared queue: " + qd.getQueue() + "\n"); - } - - @Override public void exchangeBind(Session ssn, ExchangeBind qb) - { - exchange.bindQueue(qb.getExchange(), qb.getBindingKey(),qb.getQueue()); - System.out.println("\n==================> bound queue: " + qb.getQueue() + " with binding key " + qb.getBindingKey() + "\n"); - } - - @Override public void queueQuery(Session ssn, QueueQuery qq) - { - QueueQueryResult result = new QueueQueryResult().queue(qq.getQueue()); - ssn.executionResult((int) qq.getId(), result); - } - - @Override public void messageSubscribe(Session ssn, MessageSubscribe ms) - { - Consumer c = new Consumer(); - c._queueName = ms.getQueue(); - consumers.put(ms.getDestination(),c); - System.out.println("\n==================> message subscribe : " + ms.getDestination() + " queue: " + ms.getQueue() + "\n"); - } - - @Override public void messageFlow(Session ssn,MessageFlow struct) - { - Consumer c = consumers.get(struct.getDestination()); - c._credit = struct.getValue(); - System.out.println("\n==================> message flow : " + struct.getDestination() + " credit: " + struct.getValue() + "\n"); - } - - @Override public void messageFlush(Session ssn,MessageFlush struct) - { - System.out.println("\n==================> message flush for consumer : " + struct.getDestination() + "\n"); - checkAndSendMessagesToConsumer(ssn,struct.getDestination()); - } - - @Override public void messageTransfer(Session ssn, MessageTransfer xfr) - { - String dest = xfr.getDestination(); - System.out.println("received transfer " + dest); - Header header = xfr.getHeader(); - DeliveryProperties props = header.get(DeliveryProperties.class); - if (props != null) - { - System.out.println("received headers routing_key " + props.getRoutingKey()); - } - - MessageProperties mp = header.get(MessageProperties.class); - System.out.println("MP: " + mp); - if (mp != null) - { - System.out.println(mp.getApplicationHeaders()); - } - - if (exchange.route(dest,props == null ? null : props.getRoutingKey(),xfr)) - { - System.out.println("queued " + xfr); - dispatchMessages(ssn); - } - else - { - - if (props == null || !props.getDiscardUnroutable()) - { - RangeSet ranges = new RangeSet(); - ranges.add(xfr.getId()); - ssn.messageReject(ranges, MessageRejectCode.UNROUTABLE, - "no such destination"); - } - } - ssn.processed(xfr); - } - - private void transferMessageToPeer(Session ssn,String dest, MessageTransfer m) - { - System.out.println("\n==================> Transfering message to: " +dest + "\n"); - ssn.messageTransfer(m.getDestination(), MessageAcceptMode.EXPLICIT, - MessageAcquireMode.PRE_ACQUIRED, - m.getHeader(), m.getBody()); - } - - private void dispatchMessages(Session ssn) - { - for (String dest: consumers.keySet()) - { - checkAndSendMessagesToConsumer(ssn,dest); - } - } - - private void checkAndSendMessagesToConsumer(Session ssn,String dest) - { - Consumer c = consumers.get(dest); - LinkedBlockingQueue<MessageTransfer> queue = exchange.getQueue(c._queueName); - MessageTransfer m = queue.poll(); - while (m != null && c._credit>0) - { - transferMessageToPeer(ssn,dest,m); - c._credit--; - m = queue.poll(); - } - } - - // ugly, but who cares :) - // assumes unit is always no of messages, not bytes - // assumes it's credit mode and not window - private static class Consumer - { - long _credit; - String _queueName; - } - - private static final class ToyBrokerSession extends Session - { - - public ToyBrokerSession(Connection connection, Binary name, long expiry, ToyExchange exchange) - { - super(connection, new ToyBroker(exchange), name, expiry); - } - } - - public static final void main(String[] args) throws IOException - { - final ToyExchange exchange = new ToyExchange(); - ConnectionDelegate delegate = new ServerDelegate() - { - @Override - public void init(Connection conn, ProtocolHeader hdr) - { - conn.setSessionFactory(new Connection.SessionFactory() - { - public Session newSession(Connection conn, Binary name, long expiry) - { - return new ToyBrokerSession(conn, name, expiry, exchange); - } - }); - - super.init(conn, hdr); //To change body of overridden methods use File | Settings | File Templates. - } - - }; - - MinaHandler.accept("0.0.0.0", 5672, delegate); - } - -} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/ToyClient.java b/qpid/java/common/src/main/java/org/apache/qpid/ToyClient.java deleted file mode 100644 index 5b2db10613..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpid/ToyClient.java +++ /dev/null @@ -1,108 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid; - -import java.nio.*; -import java.util.*; - -import org.apache.qpid.transport.*; -import org.apache.qpid.transport.network.mina.MinaHandler; - - -/** - * ToyClient - * - * @author Rafael H. Schloming - */ - -class ToyClient implements SessionListener -{ - public void opened(Session ssn) {} - - public void resumed(Session ssn) {} - - public void exception(Session ssn, SessionException exc) - { - exc.printStackTrace(); - } - - public void message(Session ssn, MessageTransfer xfr) - { - System.out.println("msg: " + xfr); - } - - public void closed(Session ssn) {} - - public static final void main(String[] args) - { - Connection conn = new Connection(); - conn.connect("0.0.0.0", 5672, null, "guest", "guest", false); - Session ssn = conn.createSession(); - ssn.setSessionListener(new ToyClient()); - - ssn.queueDeclare("asdf", null, null); - ssn.sync(); - - Map<String,Object> nested = new LinkedHashMap<String,Object>(); - nested.put("list", Arrays.asList("one", "two", "three")); - Map<String,Object> map = new LinkedHashMap<String,Object>(); - - map.put("str", "this is a string"); - - map.put("+int", 3); - map.put("-int", -3); - map.put("maxint", Integer.MAX_VALUE); - map.put("minint", Integer.MIN_VALUE); - - map.put("+short", (short) 1); - map.put("-short", (short) -1); - map.put("maxshort", (short) Short.MAX_VALUE); - map.put("minshort", (short) Short.MIN_VALUE); - - map.put("float", (float) 3.3); - map.put("double", 4.9); - map.put("char", 'c'); - - map.put("table", nested); - map.put("list", Arrays.asList(1, 2, 3)); - map.put("binary", new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}); - - ssn.messageTransfer("asdf", MessageAcceptMode.EXPLICIT, - MessageAcquireMode.PRE_ACQUIRED, - new Header(new DeliveryProperties(), - new MessageProperties() - .setApplicationHeaders(map)), - "this is the data"); - - ssn.messageTransfer("fdsa", MessageAcceptMode.EXPLICIT, - MessageAcquireMode.PRE_ACQUIRED, - null, - "this should be rejected"); - ssn.sync(); - - Future<QueueQueryResult> future = ssn.queueQuery("asdf"); - System.out.println(future.get().getQueue()); - ssn.sync(); - ssn.close(); - conn.close(); - } - -} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/ToyExchange.java b/qpid/java/common/src/main/java/org/apache/qpid/ToyExchange.java deleted file mode 100644 index da6aed9629..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpid/ToyExchange.java +++ /dev/null @@ -1,154 +0,0 @@ -package org.apache.qpid; -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -import org.apache.qpid.transport.MessageTransfer; - - -public class ToyExchange -{ - final static String DIRECT = "amq.direct"; - final static String TOPIC = "amq.topic"; - - private Map<String,List<LinkedBlockingQueue<MessageTransfer>>> directEx = new HashMap<String,List<LinkedBlockingQueue<MessageTransfer>>>(); - private Map<String,List<LinkedBlockingQueue<MessageTransfer>>> topicEx = new HashMap<String,List<LinkedBlockingQueue<MessageTransfer>>>(); - private Map<String,LinkedBlockingQueue<MessageTransfer>> queues = new HashMap<String,LinkedBlockingQueue<MessageTransfer>>(); - - public void createQueue(String name) - { - queues.put(name, new LinkedBlockingQueue<MessageTransfer>()); - } - - public LinkedBlockingQueue<MessageTransfer> getQueue(String name) - { - return queues.get(name); - } - - public void bindQueue(String type,String binding,String queueName) - { - LinkedBlockingQueue<MessageTransfer> queue = queues.get(queueName); - binding = normalizeKey(binding); - if(DIRECT.equals(type)) - { - - if (directEx.containsKey(binding)) - { - List<LinkedBlockingQueue<MessageTransfer>> list = directEx.get(binding); - list.add(queue); - } - else - { - List<LinkedBlockingQueue<MessageTransfer>> list = new LinkedList<LinkedBlockingQueue<MessageTransfer>>(); - list.add(queue); - directEx.put(binding,list); - } - } - else - { - if (topicEx.containsKey(binding)) - { - List<LinkedBlockingQueue<MessageTransfer>> list = topicEx.get(binding); - list.add(queue); - } - else - { - List<LinkedBlockingQueue<MessageTransfer>> list = new LinkedList<LinkedBlockingQueue<MessageTransfer>>(); - list.add(queue); - topicEx.put(binding,list); - } - } - } - - public boolean route(String dest, String routingKey, MessageTransfer msg) - { - List<LinkedBlockingQueue<MessageTransfer>> queues; - if(DIRECT.equals(dest)) - { - queues = directEx.get(routingKey); - } - else - { - queues = matchWildCard(routingKey); - } - if(queues != null && queues.size()>0) - { - System.out.println("Message stored in " + queues.size() + " queues"); - storeMessage(msg,queues); - return true; - } - else - { - System.out.println("Message unroutable " + msg); - return false; - } - } - - private String normalizeKey(String routingKey) - { - if(routingKey.indexOf(".*")>1) - { - return routingKey.substring(0,routingKey.indexOf(".*")); - } - else - { - return routingKey; - } - } - - private List<LinkedBlockingQueue<MessageTransfer>> matchWildCard(String routingKey) - { - List<LinkedBlockingQueue<MessageTransfer>> selected = new ArrayList<LinkedBlockingQueue<MessageTransfer>>(); - - for(String key: topicEx.keySet()) - { - Pattern p = Pattern.compile(key); - Matcher m = p.matcher(routingKey); - if (m.find()) - { - for(LinkedBlockingQueue<MessageTransfer> queue : topicEx.get(key)) - { - selected.add(queue); - } - } - } - - return selected; - } - - private void storeMessage(MessageTransfer msg,List<LinkedBlockingQueue<MessageTransfer>> selected) - { - for(LinkedBlockingQueue<MessageTransfer> queue : selected) - { - queue.offer(msg); - } - } - -} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolVersionException.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolVersionException.java index c9b0973ea6..2e78df2d89 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolVersionException.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolVersionException.java @@ -36,4 +36,9 @@ public class AMQProtocolVersionException extends AMQProtocolHeaderException { super(message, cause); } + + public AMQProtocolVersionException(String message) + { + super(message, null); + } } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java deleted file mode 100644 index 31953ea6ab..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.protocol; - -import java.net.SocketAddress; - -import org.apache.qpid.framing.AMQDataBlock; -import org.apache.qpid.transport.NetworkDriver; -import org.apache.qpid.transport.Receiver; - -/** - * A ProtocolEngine is a Receiver for java.nio.ByteBuffers. It takes the data passed to it in the received - * decodes it and then process the result. - */ -public interface ProtocolEngine extends Receiver<java.nio.ByteBuffer> -{ - // Sets the network driver providing data for this ProtocolEngine - void setNetworkDriver (NetworkDriver driver); - - // Returns the remote address of the NetworkDriver - SocketAddress getRemoteAddress(); - - // Returns the local address of the NetworkDriver - SocketAddress getLocalAddress(); - - // Returns number of bytes written - long getWrittenBytes(); - - // Returns number of bytes read - long getReadBytes(); - - // Called by the NetworkDriver when the socket has been closed for reading - void closed(); - - // Called when the NetworkEngine has not written data for the specified period of time (will trigger a - // heartbeat) - void writerIdle(); - - // Called when the NetworkEngine has not read data for the specified period of time (will close the connection) - void readerIdle(); - - -}
\ No newline at end of file diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/QpidAcceptor.java b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ReceiverFactory.java index 3ca22b60c8..2455d58a31 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/QpidAcceptor.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ReceiverFactory.java @@ -18,27 +18,18 @@ * under the License. * */ -package org.apache.qpid.server.transport; +package org.apache.qpid.protocol; -import org.apache.qpid.transport.NetworkDriver; +import java.nio.ByteBuffer; -public class QpidAcceptor -{ - NetworkDriver _driver; - String _protocol; - public QpidAcceptor(NetworkDriver driver, String protocol) - { - _driver = driver; - _protocol = protocol; - } +import org.apache.qpid.transport.Receiver; +import org.apache.qpid.transport.network.NetworkConnection; +import org.apache.qpid.transport.network.NetworkTransport; - public NetworkDriver getNetworkDriver() - { - return _driver; - } - - public String toString() - { - return _protocol; - } -} +public interface ReceiverFactory +{ + /** + * Returns a new instance of a {@link Receiver}. + */ + Receiver<ByteBuffer> newReceiver(NetworkTransport transport, NetworkConnection network); +}
\ No newline at end of file diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java index e5e10c0e07..8afb4b0db3 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java @@ -20,13 +20,9 @@ */ package org.apache.qpid.transport; -import static org.apache.qpid.transport.Connection.State.CLOSED; -import static org.apache.qpid.transport.Connection.State.CLOSING; -import static org.apache.qpid.transport.Connection.State.NEW; -import static org.apache.qpid.transport.Connection.State.OPEN; -import static org.apache.qpid.transport.Connection.State.OPENING; -import static org.apache.qpid.transport.Connection.State.RESUMING; +import static org.apache.qpid.transport.Connection.State.*; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -39,6 +35,13 @@ import java.util.concurrent.atomic.AtomicLong; import javax.security.sasl.SaslClient; import javax.security.sasl.SaslServer; +import org.apache.qpid.ssl.SSLContextFactory; +import org.apache.qpid.transport.network.Assembler; +import org.apache.qpid.transport.network.Disassembler; +import org.apache.qpid.transport.network.InputHandler; +import org.apache.qpid.transport.network.NetworkConnection; +import org.apache.qpid.transport.network.OutgoingNetworkTransport; +import org.apache.qpid.transport.network.Transport; import org.apache.qpid.transport.network.security.SecurityLayer; import org.apache.qpid.transport.util.Logger; import org.apache.qpid.transport.util.Waiter; @@ -118,9 +121,8 @@ public class Connection extends ConnectionInvoker private ConnectionSettings conSettings; private SecurityLayer securityLayer; private String _clientId; + private NetworkConnection network; - private static final AtomicLong idGenerator = new AtomicLong(0); - private final long _connectionId = idGenerator.incrementAndGet(); private final AtomicBoolean connectionLost = new AtomicBoolean(false); public Connection() {} @@ -202,16 +204,20 @@ public class Connection extends ConnectionInvoker public void connect(String host, int port, String vhost, String username, String password, boolean ssl) { - connect(host, port, vhost, username, password, ssl,"PLAIN"); + connect(host, port, vhost, username, password, ssl, "PLAIN"); } - public void connect(String host, int port, String vhost, String username, String password, boolean ssl,String saslMechs) + public void connect(String host, int port, String vhost, String username, String password, boolean ssl, String saslMechs) { - connect(host, port, vhost, username, password, ssl,saslMechs, Collections.EMPTY_MAP); + connect(host, port, vhost, username, password, ssl, saslMechs, "TCP", Collections.<String, Object>emptyMap()); } + public void connect(String host, int port, String vhost, String username, String password, boolean ssl, String saslMechs, String protocol) + { + connect(host, port, vhost, username, password, ssl, saslMechs, protocol, Collections.<String, Object>emptyMap()); + } - public void connect(String host, int port, String vhost, String username, String password, boolean ssl,String saslMechs,Map<String,Object> clientProps) + public void connect(String host, int port, String vhost, String username, String password, boolean ssl, String saslMechs, String protocol, Map<String,Object> clientProps) { ConnectionSettings settings = new ConnectionSettings(); settings.setHost(host); @@ -222,24 +228,32 @@ public class Connection extends ConnectionInvoker settings.setUseSSL(ssl); settings.setSaslMechs(saslMechs); settings.setClientProperties(clientProps); + settings.setProtocol(protocol); connect(settings); } public void connect(ConnectionSettings settings) { - synchronized (lock) { conSettings = settings; state = OPENING; userID = settings.getUsername(); delegate = new ClientDelegate(settings); + + securityLayer = new SecurityLayer(); + securityLayer.init(this); + + SSLContextFactory sslFactory = null; + if (settings.isUseSSL()) + { + sslFactory = new SSLContextFactory(settings.getKeyStorePath(), settings.getKeyStorePassword(), settings.getKeyStoreCertType()); + } - TransportBuilder transport = new TransportBuilder(); - transport.init(this); - this.sender = transport.buildSenderPipe(); - transport.buildReceiverPipe(this); - this.securityLayer = transport.getSecurityLayer(); + OutgoingNetworkTransport transport = Transport.getOutgoingTransport(settings.getProtocol()); + Receiver<ByteBuffer> receiver = securityLayer.receiver(new InputHandler(new Assembler(this))); + network = transport.connect(settings, receiver, sslFactory); + sender = new Disassembler(securityLayer.sender(network.getSender()), settings.getMaxFrameSize()); send(new ProtocolHeader(1, 0, 10)); @@ -251,20 +265,14 @@ public class Connection extends ConnectionInvoker if (error != null) { - ConnectionException t = error; + ConnectionException ce = error; error = null; - try + if (ce instanceof ProtocolVersionException) { - close(); + closed(); + ce.rethrow(); } - catch (ConnectionException ce) - { - if (!(t instanceof ProtocolVersionException)) - { - throw ce; - } - } - t.rethrow(); + ce.rethrow(); } switch (state) @@ -352,11 +360,6 @@ public class Connection extends ConnectionInvoker _sessionFactory = sessionFactory; } - public long getConnectionId() - { - return _connectionId; - } - public ConnectionDelegate getConnectionDelegate() { return delegate; @@ -414,7 +417,7 @@ public class Connection extends ConnectionInvoker return channelMax; } - void setChannelMax(int max) + public void setChannelMax(int max) { channelMax = max; } @@ -437,7 +440,7 @@ public class Connection extends ConnectionInvoker } } - void map(Session ssn, int channel) + public void map(Session ssn, int channel) { synchronized (lock) { @@ -446,7 +449,7 @@ public class Connection extends ConnectionInvoker } } - void unmap(Session ssn) + public void unmap(Session ssn) { synchronized (lock) { diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriver.java deleted file mode 100644 index 86af97bf7e..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriver.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.transport; - -import java.net.BindException; -import java.net.InetAddress; -import java.net.SocketAddress; - -import org.apache.qpid.protocol.ProtocolEngine; -import org.apache.qpid.protocol.ProtocolEngineFactory; -import org.apache.qpid.ssl.SSLContextFactory; - -public interface NetworkDriver extends Sender<java.nio.ByteBuffer> -{ - // Creates a NetworkDriver which attempts to connect to destination on port and attaches the ProtocolEngine to - // it using the SSLContextFactory if provided - void open(int port, InetAddress destination, ProtocolEngine engine, - NetworkDriverConfiguration config, SSLContextFactory sslFactory) - throws OpenException; - - // listens for incoming connections on the specified ports and address and creates a new NetworkDriver which - // processes incoming connections with ProtocolEngines and SSLEngines created from the factories - // (in the case of an SSLContextFactory, if provided) - void bind (int port, InetAddress[] addresses, ProtocolEngineFactory protocolFactory, - NetworkDriverConfiguration config, SSLContextFactory sslFactory) throws BindException; - - // Returns the remote address of the underlying socket - SocketAddress getRemoteAddress(); - - // Returns the local address of the underlying socket - SocketAddress getLocalAddress(); - - /** - * The length of time after which the ProtocolEngines readIdle() method should be called if no data has been - * read in seconds - */ - void setMaxReadIdle(int idleTime); - - /** - * The length of time after which the ProtocolEngines writeIdle() method should be called if no data has been - * written in seconds - */ - void setMaxWriteIdle(int idleTime); - -}
\ No newline at end of file diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriverConfiguration.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriverConfiguration.java deleted file mode 100644 index c38afe5dd5..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriverConfiguration.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.transport; - -/** - * This interface provides a means for NetworkDrivers to configure TCP options such as incoming and outgoing - * buffer sizes and set particular options on the socket. NetworkDrivers should honour the values returned - * from here if the underlying implementation supports them. - */ -public interface NetworkDriverConfiguration -{ - // Taken from Socket - Boolean getKeepAlive(); - Boolean getOOBInline(); - Boolean getReuseAddress(); - Integer getSoLinger(); // null means off - Integer getSoTimeout(); - Boolean getTcpNoDelay(); - Integer getTrafficClass(); - - // The amount of memory in bytes to allocate to the incoming buffer - Integer getReceiveBufferSize(); - - // The amount of memory in bytes to allocate to the outgoing buffer - Integer getSendBufferSize(); -} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java index f21df251da..249dfabc85 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java @@ -40,6 +40,8 @@ public class ServerDelegate extends ConnectionDelegate { protected static final Logger _logger = LoggerFactory.getLogger(ServerDelegate.class); + public static final int MAX_FRAME_SIZE = 64 * 1024 - 1; + private List<Object> _locales; private List<Object> _mechanisms; private Map<String, Object> _clientProperties; @@ -75,10 +77,7 @@ public class ServerDelegate extends ConnectionDelegate if (mechanism == null || mechanism.length() == 0) { - conn.connectionTune - (getChannelMax(), - org.apache.qpid.transport.network.ConnectionBinding.MAX_FRAME_SIZE, - 0, getHeartbeatMax()); + conn.connectionTune(getChannelMax(), MAX_FRAME_SIZE, 0, getHeartbeatMax()); return; } @@ -118,10 +117,7 @@ public class ServerDelegate extends ConnectionDelegate if (ss.isComplete()) { ss.dispose(); - conn.connectionTune - (getChannelMax(), - org.apache.qpid.transport.network.ConnectionBinding.MAX_FRAME_SIZE, - 0, getHeartbeatMax()); + conn.connectionTune(getChannelMax(), MAX_FRAME_SIZE, 0, getHeartbeatMax()); conn.setAuthorizationID(ss.getAuthorizationID()); } else @@ -153,27 +149,6 @@ public class ServerDelegate extends ConnectionDelegate } @Override - public void connectionTuneOk(Connection conn, ConnectionTuneOk ok) - { - int okChannelMax = ok.getChannelMax(); - - if (okChannelMax > getChannelMax()) - { - _logger.error("Connection '" + conn.getConnectionId() + "' being severed, " + - "client connectionTuneOk returned a channelMax (" + okChannelMax + - ") above the servers offered limit (" + getChannelMax() +")"); - - //Due to the error we must forcefully close the connection without negotiation - conn.getSender().close(); - return; - } - - //0 means no implied limit, except available server resources - //(or that forced by protocol limitations [0xFFFF]) - conn.setChannelMax(okChannelMax == 0 ? Connection.MAX_CHANNEL_MAX : okChannelMax); - } - - @Override public void connectionOpen(Connection conn, ConnectionOpen open) { conn.connectionOpenOk(Collections.emptyList()); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java index 214d4534c1..fc6a2ff2d3 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java @@ -20,7 +20,6 @@ */ package org.apache.qpid.transport; - import static org.apache.qpid.transport.Option.COMPLETED; import static org.apache.qpid.transport.Option.SYNC; import static org.apache.qpid.transport.Option.TIMELY_REPLY; @@ -89,7 +88,7 @@ public class Session extends SessionInvoker private int channel; private SessionDelegate delegate; private SessionListener listener = new DefaultSessionListener(); - private long timeout = 60000; + private long timeout = Long.getLong("amqj.default_syncwrite_timeout", 1000 * 30); private boolean autoSync = false; private boolean incomingInit; @@ -676,23 +675,23 @@ public class Session extends SessionInvoker { send(m); } - catch (SenderException e) + catch (SessionException se) + { + se.rethrow(); + } + catch (TransportException te) { if (!closing) { // if we are not closing then this will happen // again on resume - log.error(e, "error sending command"); + log.error(te, "error sending command"); } else { - e.rethrow(); + throw new SessionClosedException(te); } } - if (autoSync) - { - sync(); - } // flush every 64K commands to avoid ambiguity on // wraparound @@ -702,17 +701,21 @@ public class Session extends SessionInvoker { sessionFlush(COMPLETED); } - catch (SenderException e) + catch (SessionException se) + { + se.rethrow(); + } + catch (TransportException te) { if (!closing) { - // if expiry is > 0 then this will happen + // if we are not closing then this will happen // again on resume - log.error(e, "error sending flush (periodic)"); + log.error(te, "error sending flush (periodic)"); } else { - e.rethrow(); + throw new SessionClosedException(te); } } } @@ -757,7 +760,7 @@ public class Session extends SessionInvoker { if (state != CLOSED) { - throw new SessionException( + throw new SessionTimeoutException( String.format("timed out waiting for sync: complete = %s, point = %s", maxComplete, point)); } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionTimeoutException.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionTimeoutException.java new file mode 100644 index 0000000000..ff604d2391 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionTimeoutException.java @@ -0,0 +1,46 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.transport; + +/** + * SessionTimeoutException + */ +public class SessionTimeoutException extends SessionException +{ + /** serialVersionUID */ + private static final long serialVersionUID = 1L; + + public SessionTimeoutException(String message) + { + super(message); + } + + public SessionTimeoutException(Throwable cause) + { + super("session timed out", null, cause); + } + + @Override + public void rethrow() + { + throw new SessionTimeoutException(this); + } +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/TransportBuilder.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/TransportBuilder.java deleted file mode 100644 index c08909c6e4..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/TransportBuilder.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.transport; - -import java.nio.ByteBuffer; - -import org.apache.qpid.transport.network.Assembler; -import org.apache.qpid.transport.network.Disassembler; -import org.apache.qpid.transport.network.InputHandler; -import org.apache.qpid.transport.network.NetworkTransport; -import org.apache.qpid.transport.network.Transport; -import org.apache.qpid.transport.network.security.SecurityLayer; - -public class TransportBuilder -{ - private Connection con; - private ConnectionSettings settings; - private NetworkTransport transport; - private SecurityLayer securityLayer = new SecurityLayer(); - - public void init(Connection con) throws TransportException - { - this.con = con; - this.settings = con.getConnectionSettings(); - transport = Transport.getTransport(); - transport.init(settings); - securityLayer.init(con); - } - - public Sender<ProtocolEvent> buildSenderPipe() - { - ConnectionSettings settings = con.getConnectionSettings(); - - // Io layer - Sender<ByteBuffer> sender = transport.sender(); - - // Security layer - sender = securityLayer.sender(sender); - - Disassembler dis = new Disassembler(sender, settings.getMaxFrameSize()); - return dis; - } - - public void buildReceiverPipe(Receiver<ProtocolEvent> delegate) - { - Receiver<ByteBuffer> receiver = new InputHandler(new Assembler(delegate)); - - // Security layer - receiver = securityLayer.receiver(receiver); - - //Io layer - transport.receiver(receiver); - } - - public SecurityLayer getSecurityLayer() - { - return securityLayer; - } - -}
\ No newline at end of file diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java deleted file mode 100644 index 1a8d277bba..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java +++ /dev/null @@ -1,103 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.transport.network; - -import java.nio.ByteBuffer; - -import org.apache.qpid.transport.Binding; -import org.apache.qpid.transport.Connection; -import org.apache.qpid.transport.ConnectionDelegate; -import org.apache.qpid.transport.ConnectionListener; -import org.apache.qpid.transport.Receiver; -import org.apache.qpid.transport.Sender; -import org.apache.qpid.transport.network.security.sasl.SASLReceiver; -import org.apache.qpid.transport.network.security.sasl.SASLSender; - -/** - * ConnectionBinding - * - */ - -public abstract class ConnectionBinding - implements Binding<Connection,ByteBuffer> -{ - - public static Binding<Connection,ByteBuffer> get(final Connection connection) - { - return new ConnectionBinding() - { - public Connection connection() - { - return connection; - } - }; - } - - public static Binding<Connection,ByteBuffer> get(final ConnectionDelegate delegate) - { - return new ConnectionBinding() - { - public Connection connection() - { - Connection conn = new Connection(); - conn.setConnectionDelegate(delegate); - return conn; - } - }; - } - - public static final int MAX_FRAME_SIZE = 64 * 1024 - 1; - - public abstract Connection connection(); - - public Connection endpoint(Sender<ByteBuffer> sender) - { - Connection conn = connection(); - - if (conn.getConnectionSettings() != null && - conn.getConnectionSettings().isUseSASLEncryption()) - { - sender = new SASLSender(sender); - conn.addConnectionListener((ConnectionListener)sender); - } - - // XXX: hardcoded max-frame - Disassembler dis = new Disassembler(sender, MAX_FRAME_SIZE); - conn.setSender(dis); - return conn; - } - - public Receiver<ByteBuffer> receiver(Connection conn) - { - if (conn.getConnectionSettings() != null && - conn.getConnectionSettings().isUseSASLEncryption()) - { - SASLReceiver receiver = new SASLReceiver(new InputHandler(new Assembler(conn))); - conn.addConnectionListener((ConnectionListener)receiver); - return receiver; - } - else - { - return new InputHandler(new Assembler(conn)); - } - } - -} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/IncomingNetworkTransport.java index 9df84eef90..37359dd900 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/IncomingNetworkTransport.java @@ -18,14 +18,13 @@ * under the License. * */ -package org.apache.qpid.protocol; +package org.apache.qpid.transport.network; -import org.apache.qpid.transport.NetworkDriver; +import org.apache.qpid.protocol.ReceiverFactory; +import org.apache.qpid.ssl.SSLContextFactory; +import org.apache.qpid.transport.ConnectionSettings; -public interface ProtocolEngineFactory -{ - - // Returns a new instance of a ProtocolEngine - ProtocolEngine newProtocolEngine(NetworkDriver networkDriver); - -}
\ No newline at end of file +public interface IncomingNetworkTransport extends NetworkTransport +{ + public void accept(ConnectionSettings settings, ReceiverFactory factory, SSLContextFactory sslFactory); +}
\ No newline at end of file diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoContext.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java index 69b3a0ce45..9e8bfb7e02 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoContext.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java @@ -1,5 +1,5 @@ /* - * + * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -7,29 +7,41 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. - * + * */ -package org.apache.qpid.transport.network.io; +package org.apache.qpid.transport.network; -import java.net.Socket; +import java.net.SocketAddress; import java.nio.ByteBuffer; import org.apache.qpid.transport.Sender; -public interface IoContext +public interface NetworkConnection { Sender<ByteBuffer> getSender(); - IoReceiver getReceiver(); + void close(); + + long getReadBytes(); + + long getWrittenBytes(); + + /** + * Returns the remote address of the underlying socket. + */ + SocketAddress getRemoteAddress(); - Socket getSocket(); -} + /** + * Returns the local address of the underlying socket. + */ + SocketAddress getLocalAddress(); +}
\ No newline at end of file diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkTransport.java index 5e12d7e7c6..516df5ddbf 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkTransport.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkTransport.java @@ -20,19 +20,19 @@ */ package org.apache.qpid.transport.network; -import java.nio.ByteBuffer; - -import org.apache.qpid.transport.Receiver; -import org.apache.qpid.transport.Sender; -import org.apache.qpid.transport.ConnectionSettings; +import java.net.SocketAddress; public interface NetworkTransport { - public void init(ConnectionSettings settings); - - public Sender<ByteBuffer> sender(); - - public void receiver(Receiver<ByteBuffer> delegate); - public void close(); -}
\ No newline at end of file + + /** + * Returns the address of the underlying socket. + */ + public SocketAddress getAddress(); + + /** + * Check whether the transport is compatible with a network protocol. + */ + public boolean isCompatible(String protocol); +} diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/ITransportConnection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java index 7a24d6e15a..ff86ba481f 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/ITransportConnection.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -18,15 +18,15 @@ * under the License. * */ -package org.apache.qpid.client.transport; +package org.apache.qpid.transport.network; -import java.io.IOException; +import java.nio.ByteBuffer; -import org.apache.qpid.client.protocol.AMQProtocolHandler; -import org.apache.qpid.jms.BrokerDetails; +import org.apache.qpid.ssl.SSLContextFactory; +import org.apache.qpid.transport.ConnectionSettings; +import org.apache.qpid.transport.Receiver; -public interface ITransportConnection +public interface OutgoingNetworkTransport extends NetworkTransport { - void connect(AMQProtocolHandler protocolHandler, BrokerDetails brokerDetail) - throws IOException; -} + public NetworkConnection connect(ConnectionSettings settings, Receiver<ByteBuffer> delegate, SSLContextFactory sslFactory); +}
\ No newline at end of file diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java index 3ab875262a..8101118c6f 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java @@ -18,11 +18,16 @@ * under the License. * */ - package org.apache.qpid.transport.network; +import java.util.LinkedList; +import java.util.List; + import org.apache.qpid.transport.TransportException; +/** + * Loads the network transport class. + */ public class Transport { public static final String TCP = "tcp"; @@ -37,33 +42,85 @@ public class Transport public static final boolean WINDOWS = ((String) System.getProperties().get("os.name")).matches("(?i).*windows.*"); public static final String IO_TRANSPORT = "org.apache.qpid.transport.network.io.IoNetworkTransport"; - public static final String MINA_TRANSPORT = "org.apache.qpid.transport.network.mina.MinaNetworkTransport"; // TODO + public static final String MINA_TRANSPORT = "org.apache.qpid.transport.network.mina.MinaNetworkTransport"; public static final String NIO_TRANSPORT = "org.apache.qpid.transport.network.nio.NioNetworkTransport"; // TODO public static final String NETTY_TRANSPORT = "org.apache.qpid.transport.network.netty.NettyNetworkTransport"; // TODO - private final static Class<?> transportClass; + public static final String OUTGOING = "outgoing"; + public static final String INCOMING = "incoming"; + + private static final List<String> _outgoing = new LinkedList<String>(); + private static final List<String> _incoming = new LinkedList<String>(); - static + public static void registerIncomingTransport(String transport) { - try - { - transportClass = Class.forName(System.getProperty("qpid.transport", IO_TRANSPORT)); - } - catch(Exception e) - { - throw new Error("Error occured while loading Qpid Transport",e); - } + registerTransport(_incoming, transport); + } + + public static void registerOutgoingTransport(Class<? extends OutgoingNetworkTransport> transport) + { + registerTransport(_outgoing, transport.getName()); + } + + public static void registerOutgoingTransport(String transport) + { + registerTransport(_outgoing, transport); } - public static NetworkTransport getTransport() throws TransportException + private static void registerTransport(List<String> registered, String transport) { + registered.add(transport); + } + + public static IncomingNetworkTransport getIncomingTransport() throws TransportException + { + return (IncomingNetworkTransport) getTransport(INCOMING, _incoming, MINA_TRANSPORT, null); + } + + public static OutgoingNetworkTransport getOutgoingTransport() throws TransportException + { + return (OutgoingNetworkTransport) getTransport(OUTGOING, _outgoing, IO_TRANSPORT, null); + } + + public static OutgoingNetworkTransport getOutgoingTransport(String protocol) throws TransportException + { + return (OutgoingNetworkTransport) getTransport(OUTGOING, _outgoing, IO_TRANSPORT, protocol); + } + + private static NetworkTransport getTransport(String direction, List<String> registered, String defaultTransport, String protocol) + { + for (String transport : registered) + { + try + { + Class<?> clazz = Class.forName(transport); + NetworkTransport network = (NetworkTransport) clazz.newInstance(); + if (protocol == null || network.isCompatible(protocol)) + { + return network; + } + } + catch (Exception e) + { + // Ignore and move to next class + } + } + try { - return (NetworkTransport)transportClass.newInstance(); + String transport = System.getProperty("qpid.transport." + direction, defaultTransport); + Class<?> clazz = Class.forName(transport); + NetworkTransport network = (NetworkTransport) clazz.newInstance(); + if (protocol == null || network.isCompatible(protocol)) + { + return network; + } } catch (Exception e) { - throw new TransportException("Error while creating a new transport instance",e); + throw new TransportException("Error while creating a new " + direction + " transport instance", e); } + + throw new TransportException("Cannot create " + direction + " transport supporting " + protocol); } } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/InputHandler_0_9.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/InputHandler_0_9.java deleted file mode 100644 index ecc5f6d07c..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/InputHandler_0_9.java +++ /dev/null @@ -1,130 +0,0 @@ -package org.apache.qpid.transport.network.io; -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - - -import java.nio.ByteBuffer; - -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQFrame; -import org.apache.qpid.framing.AMQFrameDecodingException; -import org.apache.qpid.framing.AMQMethodBody; -import org.apache.qpid.framing.AMQMethodBodyFactory; -import org.apache.qpid.framing.BodyFactory; -import org.apache.qpid.framing.ContentBody; -import org.apache.qpid.framing.ContentBodyFactory; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.ContentHeaderBodyFactory; -import org.apache.qpid.framing.HeartbeatBody; -import org.apache.qpid.framing.HeartbeatBodyFactory; -import org.apache.qpid.framing.MethodRegistry; -import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; -import org.apache.qpid.transport.Receiver; - -public class InputHandler_0_9 implements Receiver<ByteBuffer> -{ - - private AMQVersionAwareProtocolSession _session; - private MethodRegistry _registry; - private BodyFactory bodyFactory; - private static final BodyFactory[] _bodiesSupported = new BodyFactory[Byte.MAX_VALUE]; - - static - { - _bodiesSupported[ContentHeaderBody.TYPE] = ContentHeaderBodyFactory.getInstance(); - _bodiesSupported[ContentBody.TYPE] = ContentBodyFactory.getInstance(); - _bodiesSupported[HeartbeatBody.TYPE] = new HeartbeatBodyFactory(); - } - - public InputHandler_0_9(AMQVersionAwareProtocolSession session) - { - _session = session; - _registry = _session.getMethodRegistry(); - } - - public void closed() - { - // AS FIXME: implement - } - - public void exception(Throwable t) - { - // TODO: propogate exception to things - t.printStackTrace(); - } - - public void received(ByteBuffer buf) - { - org.apache.mina.common.ByteBuffer in = org.apache.mina.common.ByteBuffer.wrap(buf); - try - { - final byte type = in.get(); - if (type == AMQMethodBody.TYPE) - { - bodyFactory = new AMQMethodBodyFactory(_session); - } - else - { - bodyFactory = _bodiesSupported[type]; - } - - if (bodyFactory == null) - { - throw new AMQFrameDecodingException(null, "Unsupported frame type: " + type, null); - } - - final int channel = in.getUnsignedShort(); - final long bodySize = in.getUnsignedInt(); - - // bodySize can be zero - if ((channel < 0) || (bodySize < 0)) - { - throw new AMQFrameDecodingException(null, "Undecodable frame: type = " + type + " channel = " + channel - + " bodySize = " + bodySize, null); - } - - AMQFrame frame = new AMQFrame(in, channel, bodySize, bodyFactory); - - byte marker = in.get(); - if ((marker & 0xFF) != 0xCE) - { - throw new AMQFrameDecodingException(null, "End of frame marker not found. Read " + marker + " length=" + bodySize - + " type=" + type, null); - } - - try - { - frame.getBodyFrame().handle(frame.getChannel(), _session); - } - catch (AMQException e) - { - // TODO Auto-generated catch block - e.printStackTrace(); - } - } - catch (AMQFrameDecodingException e) - { - // TODO Auto-generated catch block - e.printStackTrace(); - } - } - -} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoAcceptor.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoAcceptor.java deleted file mode 100644 index 8530240dcc..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoAcceptor.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.transport.network.io; - -import org.apache.qpid.transport.Binding; -import org.apache.qpid.transport.TransportException; - -import java.io.IOException; - -import java.net.InetSocketAddress; -import java.net.ServerSocket; -import java.net.Socket; -import java.net.SocketAddress; - -import java.nio.ByteBuffer; - - -/** - * IoAcceptor - * - */ - -public class IoAcceptor<E> extends Thread -{ - - - private ServerSocket socket; - private Binding<E,ByteBuffer> binding; - - public IoAcceptor(SocketAddress address, Binding<E,ByteBuffer> binding) - throws IOException - { - socket = new ServerSocket(); - socket.setReuseAddress(true); - socket.bind(address); - this.binding = binding; - - setName(String.format("IoAcceptor - %s", socket.getInetAddress())); - } - - /** - Close the underlying ServerSocket if it has not already been closed. - */ - public void close() throws IOException - { - if (!socket.isClosed()) - { - socket.close(); - } - } - - public IoAcceptor(String host, int port, Binding<E,ByteBuffer> binding) - throws IOException - { - this(new InetSocketAddress(host, port), binding); - } - - public void run() - { - while (true) - { - try - { - Socket sock = socket.accept(); - IoTransport<E> transport = new IoTransport<E>(sock, binding,false); - } - catch (IOException e) - { - throw new TransportException(e); - } - } - } - -} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java new file mode 100644 index 0000000000..54a64d71fa --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java @@ -0,0 +1,140 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.transport.network.io; + +import java.io.IOException; +import java.net.Socket; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.qpid.thread.Threading; +import org.apache.qpid.transport.Receiver; +import org.apache.qpid.transport.Sender; +import org.apache.qpid.transport.TransportException; +import org.apache.qpid.transport.network.NetworkConnection; +import org.apache.qpid.transport.network.Transport; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * IoNetworkConnection + */ +public class IoNetworkConnection implements NetworkConnection +{ + private static final Logger _log = LoggerFactory.getLogger(IoNetworkConnection.class); + private static final AtomicLong _id = new AtomicLong(0); + + private final int _sendBufferSize; + private final Socket _socket; + private final long _timeout; + private final AtomicBoolean _closed = new AtomicBoolean(false); + private final Thread _receiverThread; + private Sender<ByteBuffer> _sender; + + public IoNetworkConnection(Socket socket, Receiver<ByteBuffer> receiver, + int sendBufferSize, int receiveBufferSize, long timeout) + { + _sendBufferSize = sendBufferSize; + _socket = socket; + _timeout = timeout; + + try + { + IoNetworkHandler handler = new IoNetworkHandler(socket, receiver, receiveBufferSize); + _receiverThread = Threading.getThreadFactory().createThread(handler); + } + catch(Exception e) + { + throw new Error("Error creating IoNetworkTransport thread",e); + } + + // Start receiver thread as daemon + _receiverThread.setDaemon(true); + _receiverThread.setName(String.format("IoNetworkTransport-%d-%s", _id.getAndIncrement(), socket.getRemoteSocketAddress())); + _receiverThread.start(); + + // Create sender + _sender = new IoSender(_socket, 2 * _sendBufferSize, _timeout); + } + + public void close() + { + if (!_closed.getAndSet(true)) + { + try + { + if (Transport.WINDOWS) + { + _socket.close(); + } + else + { + _socket.shutdownInput(); + } + _receiverThread.join(_timeout); + if (_receiverThread.isAlive()) + { + throw new TransportException("receiverThread join timed out"); + } + if (_sender != null) + { + _sender.close(); + _sender = null; + } + } + catch (InterruptedException e) + { + // ignore + } + catch (IOException e) + { + // ignore + } + } + } + + public SocketAddress getRemoteAddress() + { + return _socket.getRemoteSocketAddress(); + } + + public SocketAddress getLocalAddress() + { + return _socket.getLocalSocketAddress(); + } + + public Sender<ByteBuffer> getSender() + { + return _sender; + } + + public long getReadBytes() + { + return 0; + } + + public long getWrittenBytes() + { + return 0; + } +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkHandler.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkHandler.java new file mode 100644 index 0000000000..01804f7436 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkHandler.java @@ -0,0 +1,100 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.transport.network.io; + +import java.io.InputStream; +import java.net.Socket; +import java.net.SocketException; +import java.nio.ByteBuffer; + +import org.apache.qpid.transport.Receiver; +import org.apache.qpid.transport.network.Transport; +import org.apache.qpid.transport.util.Logger; + +/** + * IoNetworkHandler handles incoming data and sends it to the {@link Receiver}. + */ +public class IoNetworkHandler implements Runnable +{ + private static final Logger _log = Logger.get(IoNetworkHandler.class); + + private final Receiver<ByteBuffer> _receiver; + private final int _bufSize; + private final Socket _socket; + + public IoNetworkHandler(Socket socket, Receiver<ByteBuffer> receiver, int bufSize) + { + _socket = socket; + _receiver = receiver; + _bufSize = bufSize; + } + + public void run() + { + final int threshold = _bufSize / 2; + + // I set the read buffer size simillar to SO_RCVBUF + // Haven't tested with a lower value to see if it's better or worse + byte[] buffer = new byte[_bufSize]; + try + { + InputStream in = _socket.getInputStream(); + int read = 0; + int offset = 0; + while ((read = in.read(buffer, offset, _bufSize-offset)) != -1) + { + if (read > 0) + { + ByteBuffer b = ByteBuffer.wrap(buffer,offset,read); + _receiver.received(b); + offset+=read; + if (offset > threshold) + { + offset = 0; + buffer = new byte[_bufSize]; + } + } + } + } + catch (Throwable t) + { + if (!(Transport.WINDOWS && + t instanceof SocketException && + t.getMessage().equalsIgnoreCase("socket closed") && + _socket.isClosed()) && _socket.isConnected()) + { + _receiver.exception(t); + } + } + finally + { + _receiver.closed(); + try + { + _socket.close(); + } + catch(Exception e) + { + _log.warn(e, "Error closing socket"); + } + } + } +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java index dd6a37eca2..9e9cafa1a0 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java @@ -24,54 +24,57 @@ import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.Socket; +import java.net.SocketAddress; import java.net.SocketException; import java.nio.ByteBuffer; +import org.apache.qpid.ssl.SSLContextFactory; import org.apache.qpid.transport.ConnectionSettings; import org.apache.qpid.transport.Receiver; -import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.TransportException; -import org.apache.qpid.transport.network.NetworkTransport; +import org.apache.qpid.transport.network.NetworkConnection; +import org.apache.qpid.transport.network.OutgoingNetworkTransport; +import org.apache.qpid.transport.network.Transport; import org.apache.qpid.transport.util.Logger; -public class IoNetworkTransport implements NetworkTransport, IoContext +public class IoNetworkTransport implements OutgoingNetworkTransport { - static - { - org.apache.mina.common.ByteBuffer.setAllocator - (new org.apache.mina.common.SimpleByteBufferAllocator()); - org.apache.mina.common.ByteBuffer.setUseDirectBuffers - (Boolean.getBoolean("amqj.enableDirectBuffers")); - } + private static final Logger _log = Logger.get(IoNetworkTransport.class); - private static final Logger log = Logger.get(IoNetworkTransport.class); + private static final int DEFAULT_BUFFER_SIZE = 32 * 1024; - private Socket socket; - private Sender<ByteBuffer> sender; - private IoReceiver receiver; - private long timeout = 60000; - private ConnectionSettings settings; + private Socket _socket; + private IoNetworkConnection _connection; + private long _timeout = 60000; - public void init(ConnectionSettings settings) + public NetworkConnection connect(ConnectionSettings settings, Receiver<ByteBuffer> delegate, SSLContextFactory sslfactory) { - try + if (!settings.getProtocol().equalsIgnoreCase(Transport.TCP)) { - this.settings = settings; - InetAddress address = InetAddress.getByName(settings.getHost()); - socket = new Socket(); - socket.setReuseAddress(true); - socket.setTcpNoDelay(settings.isTcpNodelay()); + throw new TransportException("Invalid protocol: " + settings.getProtocol()); + } - log.debug("default-SO_RCVBUF : %s", socket.getReceiveBufferSize()); - log.debug("default-SO_SNDBUF : %s", socket.getSendBufferSize()); + boolean noDelay = Boolean.getBoolean("amqj.tcpNoDelay"); + Integer sendBufferSize = Integer.getInteger("amqj.sendBufferSize", DEFAULT_BUFFER_SIZE); + Integer receiveBufferSize = Integer.getInteger("amqj.receiveBufferSize", DEFAULT_BUFFER_SIZE); + + try + { + _socket = new Socket(); + + _log.debug("default-SO_RCVBUF : %s", _socket.getReceiveBufferSize()); + _log.debug("default-SO_SNDBUF : %s", _socket.getSendBufferSize()); - socket.setSendBufferSize(settings.getWriteBufferSize()); - socket.setReceiveBufferSize(settings.getReadBufferSize()); + _socket.setTcpNoDelay(noDelay); + _socket.setSendBufferSize(sendBufferSize); + _socket.setReceiveBufferSize(receiveBufferSize); + _socket.setReuseAddress(true); - log.debug("new-SO_RCVBUF : %s", socket.getReceiveBufferSize()); - log.debug("new-SO_SNDBUF : %s", socket.getSendBufferSize()); + _log.debug("new-SO_RCVBUF : %s", _socket.getReceiveBufferSize()); + _log.debug("new-SO_SNDBUF : %s", _socket.getSendBufferSize()); - socket.connect(new InetSocketAddress(address, settings.getPort())); + InetAddress address = InetAddress.getByName(settings.getHost()); + _socket.connect(new InetSocketAddress(address, settings.getPort())); } catch (SocketException e) { @@ -81,36 +84,29 @@ public class IoNetworkTransport implements NetworkTransport, IoContext { throw new TransportException("Error connecting to broker", e); } - } - public void receiver(Receiver<ByteBuffer> delegate) - { - receiver = new IoReceiver(this, delegate, - 2*settings.getReadBufferSize() , timeout); - } - - public Sender<ByteBuffer> sender() - { - return new IoSender(this, 2*settings.getWriteBufferSize(), timeout); + _connection = new IoNetworkConnection(_socket, delegate, sendBufferSize, receiveBufferSize, _timeout); + + return _connection; } public void close() { - - } - - public Sender<ByteBuffer> getSender() - { - return sender; + if (_connection != null) + { + _connection.close(); + _connection = null; + } } - public IoReceiver getReceiver() + public SocketAddress getAddress() { - return receiver; + return _socket.getLocalSocketAddress(); } - public Socket getSocket() + public boolean isCompatible(String protocol) { - return socket; + return (protocol.equalsIgnoreCase(Transport.TCP) || + protocol.equalsIgnoreCase(Transport.SOCKET)); } } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java deleted file mode 100644 index 19a683d505..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java +++ /dev/null @@ -1,162 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.transport.network.io; - -import org.apache.qpid.thread.Threading; -import org.apache.qpid.transport.Receiver; -import org.apache.qpid.transport.TransportException; -import org.apache.qpid.transport.util.Logger; - -import java.io.IOException; -import java.io.InputStream; -import java.net.Socket; -import java.net.SocketException; -import java.nio.ByteBuffer; -import java.util.concurrent.atomic.AtomicBoolean; - -/** - * IoReceiver - * - */ - -final class IoReceiver implements Runnable -{ - - private static final Logger log = Logger.get(IoReceiver.class); - - private final IoContext ioCtx; - private final Receiver<ByteBuffer> receiver; - private final int bufferSize; - private final Socket socket; - private final long timeout; - private final AtomicBoolean closed = new AtomicBoolean(false); - private final Thread receiverThread; - private final boolean shutdownBroken = - ((String) System.getProperties().get("os.name")).matches("(?i).*windows.*"); - - public IoReceiver(IoContext ioCtx, Receiver<ByteBuffer> receiver, - int bufferSize, long timeout) - { - this.ioCtx = ioCtx; - this.receiver = receiver; - this.bufferSize = bufferSize; - this.socket = ioCtx.getSocket(); - this.timeout = timeout; - - try - { - receiverThread = Threading.getThreadFactory().createThread(this); - } - catch(Exception e) - { - throw new Error("Error creating IOReceiver thread",e); - } - receiverThread.setDaemon(true); - receiverThread.setName(String.format("IoReceiver - %s", socket.getRemoteSocketAddress())); - receiverThread.start(); - } - - void close(boolean block) - { - if (!closed.getAndSet(true)) - { - try - { - if (shutdownBroken) - { - socket.close(); - } - else - { - socket.shutdownInput(); - } - if (block && Thread.currentThread() != receiverThread) - { - receiverThread.join(timeout); - if (receiverThread.isAlive()) - { - throw new TransportException("join timed out"); - } - } - } - catch (InterruptedException e) - { - throw new TransportException(e); - } - catch (IOException e) - { - throw new TransportException(e); - } - } - } - - public void run() - { - final int threshold = bufferSize / 2; - - // I set the read buffer size simillar to SO_RCVBUF - // Haven't tested with a lower value to see if it's better or worse - byte[] buffer = new byte[bufferSize]; - try - { - InputStream in = socket.getInputStream(); - int read = 0; - int offset = 0; - while ((read = in.read(buffer, offset, bufferSize-offset)) != -1) - { - if (read > 0) - { - ByteBuffer b = ByteBuffer.wrap(buffer,offset,read); - receiver.received(b); - offset+=read; - if (offset > threshold) - { - offset = 0; - buffer = new byte[bufferSize]; - } - } - } - } - catch (Throwable t) - { - if (!(shutdownBroken && - t instanceof SocketException && - t.getMessage().equalsIgnoreCase("socket closed") && - closed.get())) - { - receiver.exception(t); - } - } - finally - { - receiver.closed(); - try - { - socket.close(); - } - catch(Exception e) - { - log.warn(e, "Error closing socket"); - } - } - } - -} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java index 66b97e8225..380e64c0f4 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java @@ -25,51 +25,49 @@ import java.io.OutputStream; import java.net.Socket; import java.nio.ByteBuffer; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import org.apache.qpid.thread.Threading; import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.SenderException; import org.apache.qpid.transport.TransportException; +import org.apache.qpid.transport.network.Transport; import org.apache.qpid.transport.util.Logger; - public final class IoSender implements Runnable, Sender<ByteBuffer> { - - private static final Logger log = Logger.get(IoSender.class); + private static final Logger _log = Logger.get(IoSender.class); // by starting here, we ensure that we always test the wraparound // case, we should probably make this configurable somehow so that // we can test other cases as well - private final static int START = Integer.MAX_VALUE - 10; - - private final IoContext ioCtx; - private final long timeout; - private final Socket socket; - private final OutputStream out; - - private final byte[] buffer; - private volatile int head = START; - private volatile int tail = START; - private volatile boolean idle = true; - private final Object notFull = new Object(); - private final Object notEmpty = new Object(); - private final AtomicBoolean closed = new AtomicBoolean(false); - private final Thread senderThread; - - private volatile Throwable exception = null; - - - public IoSender(IoContext ioCtx, int bufferSize, long timeout) + private static final int START = Integer.MAX_VALUE - 10; + private static final AtomicLong _id = new AtomicLong(0); + + private final long _timeout; + private final Socket _socket; + private final OutputStream _out; + + private final byte[] _buffer; + private volatile int _head = START; + private volatile int _tail = START; + private volatile boolean _idle = true; + private final Object _notFull = new Object(); + private final Object _notEmpty = new Object(); + private final AtomicBoolean _closed = new AtomicBoolean(false); + private final Thread _senderThread; + + private volatile Throwable _exception = null; + + public IoSender(Socket socket, int bufferSize, long timeout) { - this.ioCtx = ioCtx; - this.socket = ioCtx.getSocket(); - this.buffer = new byte[pof2(bufferSize)]; // buffer size must be a power of 2 - this.timeout = timeout; + _socket = socket; + _buffer = new byte[pof2(bufferSize)]; // buffer size must be a power of 2 + _timeout = timeout; try { - out = socket.getOutputStream(); + _out = socket.getOutputStream(); } catch (IOException e) { @@ -78,16 +76,16 @@ public final class IoSender implements Runnable, Sender<ByteBuffer> try { - senderThread = Threading.getThreadFactory().createThread(this); + _senderThread = Threading.getThreadFactory().createThread(this); } catch(Exception e) { throw new Error("Error creating IOSender thread",e); } - senderThread.setDaemon(true); - senderThread.setName(String.format("IoSender - %s", socket.getRemoteSocketAddress())); - senderThread.start(); + _senderThread.setDaemon(true); + _senderThread.setName(String.format("IoSender-%d-%s", _id.getAndIncrement(), socket.getRemoteSocketAddress())); + _senderThread.start(); } private static final int pof2(int n) @@ -102,31 +100,31 @@ public final class IoSender implements Runnable, Sender<ByteBuffer> public void send(ByteBuffer buf) { - if (closed.get()) + if (_closed.get()) { - throw new SenderException("sender is closed", exception); + throw new SenderException("sender is closed", _exception); } - final int size = buffer.length; + final int size = _buffer.length; int remaining = buf.remaining(); while (remaining > 0) { - final int hd = head; - final int tl = tail; + final int hd = _head; + final int tl = _tail; if (hd - tl >= size) { flush(); - synchronized (notFull) + synchronized (_notFull) { long start = System.currentTimeMillis(); long elapsed = 0; - while (!closed.get() && head - tail >= size && elapsed < timeout) + while (!_closed.get() && _head - _tail >= size && elapsed < _timeout) { try { - notFull.wait(timeout - elapsed); + _notFull.wait(_timeout - elapsed); } catch (InterruptedException e) { @@ -135,14 +133,14 @@ public final class IoSender implements Runnable, Sender<ByteBuffer> elapsed += System.currentTimeMillis() - start; } - if (closed.get()) + if (_closed.get()) { - throw new SenderException("sender is closed", exception); + throw new SenderException("sender is closed", _exception); } - if (head - tail >= size) + if (_head - _tail >= size) { - throw new SenderException(String.format("write timed out: %s, %s", head, tail)); + throw new SenderException(String.format("write timed out: %s, %s", _head, _tail)); } } continue; @@ -161,90 +159,97 @@ public final class IoSender implements Runnable, Sender<ByteBuffer> length = Math.min(size - hd_idx, remaining); } - buf.get(buffer, hd_idx, length); - head += length; + buf.get(_buffer, hd_idx, length); + _head += length; remaining -= length; } + flush(); } public void flush() { - if (idle) + if (_idle) { - synchronized (notEmpty) + synchronized (_notEmpty) { - notEmpty.notify(); + _notEmpty.notify(); } } } public void close() { - close(true); - } - - void close(boolean reportException) - { - if (!closed.getAndSet(true)) + if (!_closed.getAndSet(true)) { - synchronized (notFull) + synchronized (_notFull) { - notFull.notify(); + _notFull.notify(); } - synchronized (notEmpty) + synchronized (_notEmpty) { - notEmpty.notify(); + _notEmpty.notify(); } try { - if (Thread.currentThread() != senderThread) + if (Thread.currentThread() != _senderThread) { - senderThread.join(timeout); - if (senderThread.isAlive()) + if (Transport.WINDOWS) + { + _socket.close(); + } + else + { + _socket.shutdownOutput(); + } + _senderThread.join(_timeout); + if (_senderThread.isAlive()) { - throw new SenderException("join timed out"); + throw new SenderException("senderThread join timed out"); } } - ioCtx.getReceiver().close(false); } catch (InterruptedException e) { - throw new SenderException(e); + throw new SenderException("Close interrupted", e); + } + catch (IOException e) + { + throw new SenderException("IO error closing", e); } - if (reportException && exception != null) + if (_exception != null) { - throw new SenderException(exception); + throw new SenderException(_exception); } } } public void run() { - final int size = buffer.length; + final int size = _buffer.length; while (true) { - final int hd = head; - final int tl = tail; + final int hd = _head; + final int tl = _tail; if (hd == tl) { - if (closed.get()) + if (_closed.get()) { break; } - idle = true; + _idle = true; - synchronized (notEmpty) + synchronized (_notEmpty) { - while (head == tail && !closed.get()) + while (_head == _tail && !_closed.get()) { try { - notEmpty.wait(); + _notEmpty.wait(); } catch (InterruptedException e) { @@ -253,7 +258,7 @@ public final class IoSender implements Runnable, Sender<ByteBuffer> } } - idle = false; + _idle = false; continue; } @@ -273,21 +278,21 @@ public final class IoSender implements Runnable, Sender<ByteBuffer> try { - out.write(buffer, tl_idx, length); + _out.write(_buffer, tl_idx, length); } catch (IOException e) { - log.error(e, "error in write thread"); - exception = e; - close(false); + _log.error(e, "error in write thread"); + _exception = e; + close(); break; } - tail += length; - if (head - tl >= size) + _tail += length; + if (_head - tl >= size) { - synchronized (notFull) + synchronized (_notFull) { - notFull.notify(); + _notFull.notify(); } } } @@ -297,7 +302,7 @@ public final class IoSender implements Runnable, Sender<ByteBuffer> { try { - socket.setSoTimeout(i); + _socket.setSoTimeout(i); } catch (Exception e) { diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java deleted file mode 100644 index bfdbb34978..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java +++ /dev/null @@ -1,231 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.qpid.transport.network.io; - -import java.io.IOException; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.Socket; -import java.net.SocketException; -import java.nio.ByteBuffer; - -import javax.net.ssl.SSLContext; -import javax.net.ssl.SSLEngine; - -import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; -import org.apache.qpid.ssl.SSLContextFactory; -import org.apache.qpid.transport.Binding; -import org.apache.qpid.transport.Connection; -import org.apache.qpid.transport.ConnectionDelegate; -import org.apache.qpid.transport.Receiver; -import org.apache.qpid.transport.Sender; -import org.apache.qpid.transport.TransportException; -import org.apache.qpid.transport.network.ConnectionBinding; -import org.apache.qpid.transport.network.security.ssl.SSLReceiver; -import org.apache.qpid.transport.network.security.ssl.SSLSender; -import org.apache.qpid.transport.util.Logger; - -/** - * This class provides a socket based transport using the java.io - * classes. - * - * The following params are configurable via JVM arguments - * TCP_NO_DELAY - amqj.tcpNoDelay - * SO_RCVBUF - amqj.receiveBufferSize - * SO_SNDBUF - amqj.sendBufferSize - */ -public final class IoTransport<E> implements IoContext -{ - - static - { - org.apache.mina.common.ByteBuffer.setAllocator - (new org.apache.mina.common.SimpleByteBufferAllocator()); - org.apache.mina.common.ByteBuffer.setUseDirectBuffers - (Boolean.getBoolean("amqj.enableDirectBuffers")); - } - - private static final Logger log = Logger.get(IoTransport.class); - - private static int DEFAULT_READ_WRITE_BUFFER_SIZE = 64 * 1024; - private static int readBufferSize = Integer.getInteger - ("amqj.receiveBufferSize", DEFAULT_READ_WRITE_BUFFER_SIZE); - private static int writeBufferSize = Integer.getInteger - ("amqj.sendBufferSize", DEFAULT_READ_WRITE_BUFFER_SIZE); - - private Socket socket; - private Sender<ByteBuffer> sender; - private E endpoint; - private IoReceiver receiver; - private long timeout = 60000; - - IoTransport(Socket socket, Binding<E,ByteBuffer> binding, boolean ssl) - { - this.socket = socket; - - if (ssl) - { - SSLEngine engine = null; - SSLContext sslCtx; - try - { - sslCtx = createSSLContext(); - } - catch (Exception e) - { - throw new TransportException("Error creating SSL Context", e); - } - - try - { - engine = sslCtx.createSSLEngine(); - engine.setUseClientMode(true); - } - catch(Exception e) - { - throw new TransportException("Error creating SSL Engine", e); - } - - this.sender = new SSLSender(engine,new IoSender(this, 2*writeBufferSize, timeout)); - this.endpoint = binding.endpoint(sender); - this.receiver = new IoReceiver(this, new SSLReceiver(engine,binding.receiver(endpoint),(SSLSender)sender), - 2*readBufferSize, timeout); - - log.info("SSL Sender and Receiver initiated"); - } - else - { - this.sender = new IoSender(this, 2*writeBufferSize, timeout); - this.endpoint = binding.endpoint(sender); - this.receiver = new IoReceiver(this, binding.receiver(endpoint), - 2*readBufferSize, timeout); - } - } - - public Sender<ByteBuffer> getSender() - { - return sender; - } - - public IoReceiver getReceiver() - { - return receiver; - } - - public Socket getSocket() - { - return socket; - } - - public static final <E> E connect(String host, int port, - Binding<E,ByteBuffer> binding, - boolean ssl) - { - Socket socket = createSocket(host, port); - IoTransport<E> transport = new IoTransport<E>(socket, binding,ssl); - return transport.endpoint; - } - - public static final Connection connect(String host, int port, - ConnectionDelegate delegate, - boolean ssl) - { - return connect(host, port, ConnectionBinding.get(delegate),ssl); - } - - public static void connect_0_9(AMQVersionAwareProtocolSession session, String host, int port, boolean ssl) - { - connect(host, port, new Binding_0_9(session),ssl); - } - - private static class Binding_0_9 - implements Binding<AMQVersionAwareProtocolSession,ByteBuffer> - { - - private AMQVersionAwareProtocolSession session; - - Binding_0_9(AMQVersionAwareProtocolSession session) - { - this.session = session; - } - - public AMQVersionAwareProtocolSession endpoint(Sender<ByteBuffer> sender) - { - session.setSender(sender); - return session; - } - - public Receiver<ByteBuffer> receiver(AMQVersionAwareProtocolSession ssn) - { - return new InputHandler_0_9(ssn); - } - - } - - private static Socket createSocket(String host, int port) - { - try - { - InetAddress address = InetAddress.getByName(host); - Socket socket = new Socket(); - socket.setReuseAddress(true); - socket.setTcpNoDelay(Boolean.getBoolean("amqj.tcpNoDelay")); - - log.debug("default-SO_RCVBUF : %s", socket.getReceiveBufferSize()); - log.debug("default-SO_SNDBUF : %s", socket.getSendBufferSize()); - - socket.setSendBufferSize(writeBufferSize); - socket.setReceiveBufferSize(readBufferSize); - - log.debug("new-SO_RCVBUF : %s", socket.getReceiveBufferSize()); - log.debug("new-SO_SNDBUF : %s", socket.getSendBufferSize()); - - socket.connect(new InetSocketAddress(address, port)); - return socket; - } - catch (SocketException e) - { - throw new TransportException("Error connecting to broker", e); - } - catch (IOException e) - { - throw new TransportException("Error connecting to broker", e); - } - } - - private SSLContext createSSLContext() throws Exception - { - String trustStorePath = System.getProperty("javax.net.ssl.trustStore"); - String trustStorePassword = System.getProperty("javax.net.ssl.trustStorePassword"); - String trustStoreCertType = System.getProperty("qpid.ssl.trustStoreCertType","SunX509"); - - String keyStorePath = System.getProperty("javax.net.ssl.keyStore",trustStorePath); - String keyStorePassword = System.getProperty("javax.net.ssl.keyStorePassword",trustStorePassword); - String keyStoreCertType = System.getProperty("qpid.ssl.keyStoreCertType","SunX509"); - - SSLContextFactory sslContextFactory = new SSLContextFactory(trustStorePath,trustStorePassword, - trustStoreCertType,keyStorePath, - keyStorePassword,keyStoreCertType); - - return sslContextFactory.buildServerContext(); - - } - -} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java deleted file mode 100644 index 0f2c0d0226..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java +++ /dev/null @@ -1,435 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.qpid.transport.network.mina; - -import org.apache.mina.common.ConnectFuture; -import org.apache.mina.common.ExecutorThreadModel; -import org.apache.mina.common.IdleStatus; -import org.apache.mina.common.IoAcceptor; -import org.apache.mina.common.IoConnector; -import org.apache.mina.common.IoFilterChain; -import org.apache.mina.common.IoHandlerAdapter; -import org.apache.mina.common.IoSession; -import org.apache.mina.common.SimpleByteBufferAllocator; -import org.apache.mina.common.WriteFuture; -import org.apache.mina.filter.ReadThrottleFilterBuilder; -import org.apache.mina.filter.SSLFilter; -import org.apache.mina.filter.WriteBufferLimitFilterBuilder; -import org.apache.mina.filter.executor.ExecutorFilter; -import org.apache.mina.transport.socket.nio.MultiThreadSocketConnector; -import org.apache.mina.transport.socket.nio.SocketAcceptorConfig; -import org.apache.mina.transport.socket.nio.SocketConnector; -import org.apache.mina.transport.socket.nio.SocketConnectorConfig; -import org.apache.mina.transport.socket.nio.SocketSessionConfig; -import org.apache.mina.util.NewThreadExecutor; -import org.apache.mina.util.SessionUtil; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.qpid.protocol.ProtocolEngine; -import org.apache.qpid.protocol.ProtocolEngineFactory; -import org.apache.qpid.ssl.SSLContextFactory; -import org.apache.qpid.thread.QpidThreadExecutor; -import org.apache.qpid.transport.NetworkDriver; -import org.apache.qpid.transport.NetworkDriverConfiguration; -import org.apache.qpid.transport.OpenException; - -import java.io.IOException; -import java.net.BindException; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.nio.ByteBuffer; - -public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver -{ - - private static final int DEFAULT_BUFFER_SIZE = 32 * 1024; - - ProtocolEngine _protocolEngine; - private boolean _useNIO = false; - private int _processors = 4; - private boolean _executorPool = false; - private SSLContextFactory _sslFactory = null; - private IoConnector _socketConnector; - private IoAcceptor _acceptor; - private IoSession _ioSession; - private ProtocolEngineFactory _factory; - private boolean _protectIO; - private NetworkDriverConfiguration _config; - private Throwable _lastException; - private boolean _acceptingConnections = false; - - private WriteFuture _lastWriteFuture; - - private static final Logger _logger = LoggerFactory.getLogger(MINANetworkDriver.class); - - static - { - org.apache.mina.common.ByteBuffer.setUseDirectBuffers(Boolean.getBoolean("amqj.enableDirectBuffers")); - - //override the MINA defaults to prevent use of the PooledByteBufferAllocator - org.apache.mina.common.ByteBuffer.setAllocator(new SimpleByteBufferAllocator()); - } - - public MINANetworkDriver(boolean useNIO, int processors, boolean executorPool, boolean protectIO) - { - _useNIO = useNIO; - _processors = processors; - _executorPool = executorPool; - _protectIO = protectIO; - } - - public MINANetworkDriver(boolean useNIO, int processors, boolean executorPool, boolean protectIO, - ProtocolEngine protocolEngine, IoSession session) - { - _useNIO = useNIO; - _processors = processors; - _executorPool = executorPool; - _protectIO = protectIO; - _protocolEngine = protocolEngine; - _ioSession = session; - _ioSession.setAttachment(_protocolEngine); - } - - public MINANetworkDriver() - { - - } - - public MINANetworkDriver(IoConnector ioConnector) - { - _socketConnector = ioConnector; - } - - public MINANetworkDriver(IoConnector ioConnector, ProtocolEngine engine) - { - _socketConnector = ioConnector; - _protocolEngine = engine; - } - - public void bind(int port, InetAddress[] addresses, ProtocolEngineFactory factory, - NetworkDriverConfiguration config, SSLContextFactory sslFactory) throws BindException - { - - _factory = factory; - _config = config; - - if (_useNIO) - { - _acceptor = new org.apache.mina.transport.socket.nio.MultiThreadSocketAcceptor(_processors, - new NewThreadExecutor()); - } - else - { - _acceptor = new org.apache.mina.transport.socket.nio.SocketAcceptor(_processors, new NewThreadExecutor()); - } - - SocketAcceptorConfig sconfig = (SocketAcceptorConfig) _acceptor.getDefaultConfig(); - sconfig.setThreadModel(ExecutorThreadModel.getInstance("MINANetworkDriver(Acceptor)")); - SocketSessionConfig sc = (SocketSessionConfig) sconfig.getSessionConfig(); - - if (config != null) - { - sc.setReceiveBufferSize(config.getReceiveBufferSize()); - sc.setSendBufferSize(config.getSendBufferSize()); - sc.setTcpNoDelay(config.getTcpNoDelay()); - } - - if (sslFactory != null) - { - _sslFactory = sslFactory; - } - - if (addresses != null && addresses.length > 0) - { - for (InetAddress addr : addresses) - { - try - { - _acceptor.bind(new InetSocketAddress(addr, port), this, sconfig); - } - catch (IOException e) - { - throw new BindException(String.format("Could not bind to %1s:%2s", addr, port)); - } - } - } - else - { - try - { - _acceptor.bind(new InetSocketAddress(port), this, sconfig); - } - catch (IOException e) - { - throw new BindException(String.format("Could not bind to *:%1s", port)); - } - } - _acceptingConnections = true; - } - - public SocketAddress getRemoteAddress() - { - return _ioSession.getRemoteAddress(); - } - - public SocketAddress getLocalAddress() - { - return _ioSession.getLocalAddress(); - } - - - public void open(int port, InetAddress destination, ProtocolEngine engine, NetworkDriverConfiguration config, - SSLContextFactory sslFactory) throws OpenException - { - if (sslFactory != null) - { - _sslFactory = sslFactory; - } - - if (_useNIO) - { - _socketConnector = new MultiThreadSocketConnector(1, new QpidThreadExecutor()); - } - else - { - _socketConnector = new SocketConnector(1, new QpidThreadExecutor()); // non-blocking - // connector - } - - SocketConnectorConfig cfg = (SocketConnectorConfig) _socketConnector.getDefaultConfig(); - String s = ""; - StackTraceElement[] trace = Thread.currentThread().getStackTrace(); - for(StackTraceElement elt : trace) - { - if(elt.getClassName().contains("Test")) - { - s = elt.getClassName(); - break; - } - } - cfg.setThreadModel(ExecutorThreadModel.getInstance("MINANetworkDriver(Client)-"+s)); - - SocketSessionConfig scfg = (SocketSessionConfig) cfg.getSessionConfig(); - scfg.setTcpNoDelay((config != null) ? config.getTcpNoDelay() : true); - scfg.setSendBufferSize((config != null) ? config.getSendBufferSize() : DEFAULT_BUFFER_SIZE); - scfg.setReceiveBufferSize((config != null) ? config.getReceiveBufferSize() : DEFAULT_BUFFER_SIZE); - - // Don't have the connector's worker thread wait around for other - // connections (we only use - // one SocketConnector per connection at the moment anyway). This allows - // short-running - // clients (like unit tests) to complete quickly. - if (_socketConnector instanceof SocketConnector) - { - ((SocketConnector) _socketConnector).setWorkerTimeout(0); - } - - ConnectFuture future = _socketConnector.connect(new InetSocketAddress(destination, port), this, cfg); - future.join(); - if (!future.isConnected()) - { - throw new OpenException("Could not open connection", _lastException); - } - _ioSession = future.getSession(); - _ioSession.setAttachment(engine); - engine.setNetworkDriver(this); - _protocolEngine = engine; - } - - public void setMaxReadIdle(int idleTime) - { - _ioSession.setIdleTime(IdleStatus.READER_IDLE, idleTime); - } - - public void setMaxWriteIdle(int idleTime) - { - _ioSession.setIdleTime(IdleStatus.WRITER_IDLE, idleTime); - } - - public void close() - { - if (_lastWriteFuture != null) - { - _lastWriteFuture.join(); - } - if (_acceptor != null) - { - _acceptor.unbindAll(); - } - if (_ioSession != null) - { - _ioSession.close(); - } - } - - public void flush() - { - if (_lastWriteFuture != null) - { - _lastWriteFuture.join(); - } - } - - public void send(ByteBuffer msg) - { - org.apache.mina.common.ByteBuffer minaBuf = org.apache.mina.common.ByteBuffer.allocate(msg.capacity()); - minaBuf.put(msg); - minaBuf.flip(); - _lastWriteFuture = _ioSession.write(minaBuf); - } - - public void setIdleTimeout(int i) - { - // MINA doesn't support setting SO_TIMEOUT - } - - public void exceptionCaught(IoSession protocolSession, Throwable throwable) throws Exception - { - if (_protocolEngine != null) - { - _protocolEngine.exception(throwable); - } - else - { - _logger.error("Exception thrown and no ProtocolEngine to handle it", throwable); - } - _lastException = throwable; - } - - /** - * Invoked when a message is received on a particular protocol session. Note - * that a protocol session is directly tied to a particular physical - * connection. - * - * @param protocolSession - * the protocol session that received the message - * @param message - * the message itself (i.e. a decoded frame) - * - * @throws Exception - * if the message cannot be processed - */ - public void messageReceived(IoSession protocolSession, Object message) throws Exception - { - if (message instanceof org.apache.mina.common.ByteBuffer) - { - ((ProtocolEngine) protocolSession.getAttachment()).received(((org.apache.mina.common.ByteBuffer) message).buf()); - } - else - { - throw new IllegalStateException("Handed unhandled message. message.class = " + message.getClass() + " message = " + message); - } - } - - public void sessionClosed(IoSession protocolSession) throws Exception - { - ((ProtocolEngine) protocolSession.getAttachment()).closed(); - } - - public void sessionCreated(IoSession protocolSession) throws Exception - { - // Configure the session with SSL if necessary - SessionUtil.initialize(protocolSession); - if (_executorPool) - { - if (_sslFactory != null) - { - protocolSession.getFilterChain().addAfter("AsynchronousReadFilter", "sslFilter", - new SSLFilter(_sslFactory.buildServerContext())); - } - } - else - { - if (_sslFactory != null) - { - protocolSession.getFilterChain().addBefore("protocolFilter", "sslFilter", - new SSLFilter(_sslFactory.buildServerContext())); - } - } - // Do we want to have read/write buffer limits? - if (_protectIO) - { - //Add IO Protection Filters - IoFilterChain chain = protocolSession.getFilterChain(); - - protocolSession.getFilterChain().addLast("tempExecutorFilterForFilterBuilder", new ExecutorFilter()); - - ReadThrottleFilterBuilder readfilter = new ReadThrottleFilterBuilder(); - readfilter.setMaximumConnectionBufferSize(_config.getReceiveBufferSize()); - readfilter.attach(chain); - - WriteBufferLimitFilterBuilder writefilter = new WriteBufferLimitFilterBuilder(); - writefilter.setMaximumConnectionBufferSize(_config.getSendBufferSize()); - writefilter.attach(chain); - - protocolSession.getFilterChain().remove("tempExecutorFilterForFilterBuilder"); - } - - if (_ioSession == null) - { - _ioSession = protocolSession; - } - - if (_acceptingConnections) - { - // Set up the protocol engine - ProtocolEngine protocolEngine = _factory.newProtocolEngine(this); - MINANetworkDriver newDriver = new MINANetworkDriver(_useNIO, _processors, _executorPool, _protectIO, protocolEngine, protocolSession); - protocolEngine.setNetworkDriver(newDriver); - } - } - - public void sessionIdle(IoSession session, IdleStatus status) throws Exception - { - if (IdleStatus.WRITER_IDLE.equals(status)) - { - ((ProtocolEngine) session.getAttachment()).writerIdle(); - } - else if (IdleStatus.READER_IDLE.equals(status)) - { - ((ProtocolEngine) session.getAttachment()).readerIdle(); - } - } - - private ProtocolEngine getProtocolEngine() - { - return _protocolEngine; - } - - public void setProtocolEngineFactory(ProtocolEngineFactory engineFactory, boolean acceptingConnections) - { - _factory = engineFactory; - _acceptingConnections = acceptingConnections; - } - - public void setProtocolEngine(ProtocolEngine protocolEngine) - { - _protocolEngine = protocolEngine; - if (_ioSession != null) - { - _ioSession.setAttachment(protocolEngine); - } - } - -} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaHandler.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaHandler.java deleted file mode 100644 index b89eed48b0..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaHandler.java +++ /dev/null @@ -1,274 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.transport.network.mina; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.SocketAddress; - -import org.apache.mina.common.*; - -import org.apache.mina.transport.socket.nio.SocketAcceptor; -import org.apache.mina.transport.socket.nio.SocketSessionConfig; -import org.apache.mina.transport.socket.nio.SocketConnector; -import org.apache.mina.filter.ReadThrottleFilterBuilder; -import org.apache.mina.filter.WriteBufferLimitFilterBuilder; -import org.apache.mina.filter.executor.ExecutorFilter; - -import org.apache.qpid.transport.Binding; -import org.apache.qpid.transport.Connection; -import org.apache.qpid.transport.ConnectionDelegate; -import org.apache.qpid.transport.Receiver; -import org.apache.qpid.transport.Sender; -import org.apache.qpid.transport.network.ConnectionBinding; - -import org.apache.qpid.transport.util.Logger; - -import org.apache.qpid.transport.network.Assembler; -import org.apache.qpid.transport.network.Disassembler; -import org.apache.qpid.transport.network.InputHandler; - -import static org.apache.qpid.transport.util.Functions.*; - -/** - * MinaHandler - * - * @author Rafael H. Schloming - */ -//RA making this public until we sort out the package issues -public class MinaHandler<E> implements IoHandler -{ - /** Default buffer size for pending messages reads */ - private static final String DEFAULT_READ_BUFFER_LIMIT = "262144"; - /** Default buffer size for pending messages writes */ - private static final String DEFAULT_WRITE_BUFFER_LIMIT = "262144"; - private static final int MAX_RCVBUF = 64*1024; - - private static final Logger log = Logger.get(MinaHandler.class); - - static - { - ByteBuffer.setAllocator(new SimpleByteBufferAllocator()); - ByteBuffer.setUseDirectBuffers(Boolean.getBoolean("amqj.enableDirectBuffers")); - } - - private final Binding<E,java.nio.ByteBuffer> binding; - - private MinaHandler(Binding<E,java.nio.ByteBuffer> binding) - { - this.binding = binding; - } - - public void messageReceived(IoSession ssn, Object obj) - { - Attachment<E> attachment = (Attachment<E>) ssn.getAttachment(); - ByteBuffer buf = (ByteBuffer) obj; - try - { - attachment.receiver.received(buf.buf()); - } - catch (Throwable t) - { - log.error(t, "exception handling buffer %s", str(buf.buf())); - throw new RuntimeException(t); - } - } - - public void messageSent(IoSession ssn, Object obj) - { - // do nothing - } - - public void exceptionCaught(IoSession ssn, Throwable e) - { - Attachment<E> attachment = (Attachment<E>) ssn.getAttachment(); - attachment.receiver.exception(e); - } - - /** - * Invoked by MINA when a MINA session for a new connection is created. This method sets up the filter chain on the - * session, which filters the events handled by this handler. The filter chain consists of, handing off events - * to an optional protectio - * - * @param session The MINA session. - * @throws Exception Any underlying exceptions are allowed to fall through to MINA. - */ - public void sessionCreated(IoSession session) throws Exception - { - log.debug("Protocol session created for session " + System.identityHashCode(session)); - - if (Boolean.getBoolean("protectio")) - { - try - { - //Add IO Protection Filters - IoFilterChain chain = session.getFilterChain(); - - session.getFilterChain().addLast("tempExecutorFilterForFilterBuilder", new ExecutorFilter()); - - ReadThrottleFilterBuilder readfilter = new ReadThrottleFilterBuilder(); - readfilter.setMaximumConnectionBufferSize( - Integer.parseInt(System.getProperty("qpid.read.buffer.limit", DEFAULT_READ_BUFFER_LIMIT))); - readfilter.attach(chain); - - WriteBufferLimitFilterBuilder writefilter = new WriteBufferLimitFilterBuilder(); - writefilter.setMaximumConnectionBufferSize( - Integer.parseInt(System.getProperty("qpid.write.buffer.limit", DEFAULT_WRITE_BUFFER_LIMIT))); - writefilter.attach(chain); - session.getFilterChain().remove("tempExecutorFilterForFilterBuilder"); - - log.info("Using IO Read/Write Filter Protection"); - } - catch (Exception e) - { - log.error("Unable to attach IO Read/Write Filter Protection :" + e.getMessage()); - } - } - } - - public void sessionOpened(final IoSession ssn) - { - log.debug("opened: %s", this); - E endpoint = binding.endpoint(new MinaSender(ssn)); - Attachment<E> attachment = - new Attachment<E>(endpoint, binding.receiver(endpoint)); - - // We need to synchronize and notify here because the MINA - // connect future returns the session prior to the attachment - // being set. This is arguably a bug in MINA. - synchronized (ssn) - { - ssn.setAttachment(attachment); - ssn.notifyAll(); - } - } - - public void sessionClosed(IoSession ssn) - { - log.debug("closed: %s", ssn); - Attachment<E> attachment = (Attachment<E>) ssn.getAttachment(); - attachment.receiver.closed(); - ssn.setAttachment(null); - } - - public void sessionIdle(IoSession ssn, IdleStatus status) - { - // do nothing - } - - private static class Attachment<E> - { - - E endpoint; - Receiver<java.nio.ByteBuffer> receiver; - - Attachment(E endpoint, Receiver<java.nio.ByteBuffer> receiver) - { - this.endpoint = endpoint; - this.receiver = receiver; - } - } - - public static final void accept(String host, int port, - Binding<?,java.nio.ByteBuffer> binding) - throws IOException - { - accept(new InetSocketAddress(host, port), binding); - } - - public static final <E> void accept(SocketAddress address, - Binding<E,java.nio.ByteBuffer> binding) - throws IOException - { - IoAcceptor acceptor = new SocketAcceptor(); - acceptor.bind(address, new MinaHandler<E>(binding)); - } - - public static final <E> E connect(String host, int port, - Binding<E,java.nio.ByteBuffer> binding) - { - return connect(new InetSocketAddress(host, port), binding); - } - - public static final <E> E connect(SocketAddress address, - Binding<E,java.nio.ByteBuffer> binding) - { - MinaHandler<E> handler = new MinaHandler<E>(binding); - SocketConnector connector = new SocketConnector(); - IoServiceConfig acceptorConfig = connector.getDefaultConfig(); - acceptorConfig.setThreadModel(ThreadModel.MANUAL); - SocketSessionConfig scfg = (SocketSessionConfig) acceptorConfig.getSessionConfig(); - scfg.setTcpNoDelay(Boolean.getBoolean("amqj.tcpNoDelay")); - Integer sendBufferSize = Integer.getInteger("amqj.sendBufferSize"); - if (sendBufferSize != null && sendBufferSize > 0) - { - scfg.setSendBufferSize(sendBufferSize); - } - Integer receiveBufferSize = Integer.getInteger("amqj.receiveBufferSize"); - if (receiveBufferSize != null && receiveBufferSize > 0) - { - scfg.setReceiveBufferSize(receiveBufferSize); - } - else if (scfg.getReceiveBufferSize() > MAX_RCVBUF) - { - scfg.setReceiveBufferSize(MAX_RCVBUF); - } - connector.setWorkerTimeout(0); - ConnectFuture cf = connector.connect(address, handler); - cf.join(); - IoSession ssn = cf.getSession(); - - // We need to synchronize and wait here because the MINA - // connect future returns the session prior to the attachment - // being set. This is arguably a bug in MINA. - synchronized (ssn) - { - while (ssn.getAttachment() == null) - { - try - { - ssn.wait(); - } - catch (InterruptedException e) - { - throw new RuntimeException(e); - } - } - } - - Attachment<E> attachment = (Attachment<E>) ssn.getAttachment(); - return attachment.endpoint; - } - - public static final void accept(String host, int port, - ConnectionDelegate delegate) - throws IOException - { - accept(host, port, ConnectionBinding.get(delegate)); - } - - public static final Connection connect(String host, int port, - ConnectionDelegate delegate) - { - return connect(host, port, ConnectionBinding.get(delegate)); - } - -} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkConnection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkConnection.java new file mode 100644 index 0000000000..0ccdfdd72e --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkConnection.java @@ -0,0 +1,74 @@ +/* +* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.transport.network.mina; + +import java.net.SocketAddress; +import java.nio.ByteBuffer; + +import org.apache.mina.common.IoSession; +import org.apache.qpid.transport.Sender; +import org.apache.qpid.transport.network.NetworkConnection; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MinaNetworkConnection implements NetworkConnection +{ + private static final Logger _log = LoggerFactory.getLogger(MinaNetworkConnection.class); + + private IoSession _session; + private Sender<ByteBuffer> _sender; + + public MinaNetworkConnection(IoSession session) + { + _session = session; + _sender = new MinaSender(_session); + } + + public Sender<ByteBuffer> getSender() + { + return _sender; + } + + public void close() + { + _session.close(); + } + + public SocketAddress getRemoteAddress() + { + return _session.getRemoteAddress(); + } + + public SocketAddress getLocalAddress() + { + return _session.getLocalAddress(); + } + + public long getReadBytes() + { + return _session.getReadBytes(); + } + + public long getWrittenBytes() + { + return _session.getWrittenBytes(); + } +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkHandler.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkHandler.java new file mode 100644 index 0000000000..77c3ed318b --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkHandler.java @@ -0,0 +1,153 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.transport.network.mina; + +import static org.apache.qpid.transport.util.Functions.str; + +import org.apache.mina.common.ByteBuffer; +import org.apache.mina.common.IdleStatus; +import org.apache.mina.common.IoFilterChain; +import org.apache.mina.common.IoHandlerAdapter; +import org.apache.mina.common.IoSession; +import org.apache.mina.filter.ReadThrottleFilterBuilder; +import org.apache.mina.filter.SSLFilter; +import org.apache.mina.util.SessionUtil; +import org.apache.qpid.protocol.ReceiverFactory; +import org.apache.qpid.ssl.SSLContextFactory; +import org.apache.qpid.transport.Receiver; +import org.apache.qpid.transport.network.NetworkConnection; +import org.apache.qpid.transport.network.NetworkTransport; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * MinaNetworkHandler + */ +@SuppressWarnings({ "unchecked", "rawtypes" }) +public class MinaNetworkHandler extends IoHandlerAdapter +{ + private static final Logger _log = LoggerFactory.getLogger(MinaNetworkHandler.class); + + /** Default buffer size for pending messages reads */ + private static final String DEFAULT_READ_BUFFER_LIMIT = "262144"; + + private NetworkTransport _transport = null; + private SSLContextFactory _sslFactory = null; + private ReceiverFactory _factory = null; + + public MinaNetworkHandler(NetworkTransport transport, SSLContextFactory sslFactory, ReceiverFactory factory) + { + _transport = transport; + _sslFactory = sslFactory; + _factory = factory; + } + + public MinaNetworkHandler(NetworkTransport transport, SSLContextFactory sslFactory) + { + this(transport, sslFactory, null); + } + + @Override + public void messageReceived(IoSession session, Object message) + { + Receiver<java.nio.ByteBuffer> receiver = (Receiver) session.getAttachment(); + ByteBuffer buf = (ByteBuffer) message; + try + { + receiver.received(buf.buf()); + } + catch (RuntimeException re) + { + receiver.exception(re); + } + } + + @Override + public void exceptionCaught(IoSession ssn, Throwable e) + { + _log.error("Exception caught by Mina", e); + Receiver<java.nio.ByteBuffer> receiver = (Receiver) ssn.getAttachment(); + receiver.exception(e); + } + + /** + * Invoked by MINA when a MINA session for a new connection is created. This method sets up the filter chain on the + * session, which filters the events handled by this handler. The filter chain consists of, handing off events + * to an optional protectio + * + * @param session The MINA session. + * @throws Exception Any underlying exceptions are allowed to fall through to MINA. + */ + @Override + public void sessionCreated(IoSession session) throws Exception + { + _log.debug("Created session: " + System.identityHashCode(session)); + SessionUtil.initialize(session); + + IoFilterChain chain = session.getFilterChain(); + + if (_sslFactory != null) + { + chain.addFirst("sslFilter", new SSLFilter(_sslFactory.buildServerContext())); + } + + // Add IO Protection Filters + if (Boolean.getBoolean("protectio")) + { + try + { + ReadThrottleFilterBuilder readfilter = new ReadThrottleFilterBuilder(); + readfilter.setMaximumConnectionBufferSize(Integer.parseInt(System.getProperty("qpid.read.buffer.limit", DEFAULT_READ_BUFFER_LIMIT))); + readfilter.attach(chain); + _log.info("Using IO Read Filter Protection"); + } + catch (Exception e) + { + _log.error("Unable to attach IO Read Filter Protection", e); + } + } + + if (_factory != null) + { + NetworkConnection network = new MinaNetworkConnection(session); + + Receiver<java.nio.ByteBuffer> receiver = _factory.newReceiver(_transport, network); + session.setAttachment(receiver); + } + } + + @Override + public void sessionClosed(IoSession session) throws Exception + { + _log.debug("closed: " + System.identityHashCode(session)); + Receiver<java.nio.ByteBuffer> receiver = (Receiver) session.getAttachment(); + receiver.closed(); + } + + @Override + public void sessionIdle(IoSession session, IdleStatus status) throws Exception + { + if (status == IdleStatus.READER_IDLE || status == IdleStatus.BOTH_IDLE) + { + session.close(); + } + } +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkTransport.java new file mode 100644 index 0000000000..d186a7a16e --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkTransport.java @@ -0,0 +1,245 @@ +/* +* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.transport.network.mina; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.net.SocketAddress; +import java.nio.ByteBuffer; + +import org.apache.mina.common.CloseFuture; +import org.apache.mina.common.ConnectFuture; +import org.apache.mina.common.ExecutorThreadModel; +import org.apache.mina.common.IoAcceptor; +import org.apache.mina.common.IoConnector; +import org.apache.mina.common.IoServiceConfig; +import org.apache.mina.common.IoSession; +import org.apache.mina.common.SimpleByteBufferAllocator; +import org.apache.mina.transport.socket.nio.ExistingSocketConnector; +import org.apache.mina.transport.socket.nio.SocketAcceptor; +import org.apache.mina.transport.socket.nio.SocketAcceptorConfig; +import org.apache.mina.transport.socket.nio.SocketConnector; +import org.apache.mina.transport.socket.nio.SocketSessionConfig; +import org.apache.mina.transport.vmpipe.VmPipeAcceptor; +import org.apache.mina.transport.vmpipe.VmPipeAddress; +import org.apache.mina.transport.vmpipe.VmPipeConnector; +import org.apache.qpid.protocol.ReceiverFactory; +import org.apache.qpid.ssl.SSLContextFactory; +import org.apache.qpid.transport.ConnectionSettings; +import org.apache.qpid.transport.Receiver; +import org.apache.qpid.transport.TransportException; +import org.apache.qpid.transport.network.IncomingNetworkTransport; +import org.apache.qpid.transport.network.NetworkConnection; +import org.apache.qpid.transport.network.OutgoingNetworkTransport; +import org.apache.qpid.transport.network.Transport; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import edu.emory.mathcs.backport.java.util.concurrent.ExecutorService; +import edu.emory.mathcs.backport.java.util.concurrent.Executors; + +public class MinaNetworkTransport implements IncomingNetworkTransport, OutgoingNetworkTransport +{ + private static final Logger _log = LoggerFactory.getLogger(MinaNetworkTransport.class); + + private static final int DEFAULT_BUFFER_SIZE = 32 * 1024; + + private MinaNetworkConnection _connection; + private ConnectionSettings _settings; + private SocketAddress _address; + private IoConnector _connector; + private IoSession _session; + private IoAcceptor _acceptor; + private Receiver<ByteBuffer> _receiver; + private ExecutorService _executor; + private int _processors = 4; + + static + { + org.apache.mina.common.ByteBuffer.setUseDirectBuffers(Boolean.getBoolean("amqj.enableDirectBuffers")); + org.apache.mina.common.ByteBuffer.setAllocator(new SimpleByteBufferAllocator()); + } + + public NetworkConnection connect(ConnectionSettings settings, Receiver<ByteBuffer> delegate, SSLContextFactory sslFactory) + { + _log.debug("Initialising MINA transport"); + + _settings = settings; + _receiver = delegate; + _processors = Runtime.getRuntime().availableProcessors(); + + if (_settings.getProtocol().equalsIgnoreCase(Transport.TCP)) + { + _address = new InetSocketAddress(_settings.getHost(), _settings.getPort()); + _executor = Executors.newFixedThreadPool(_processors); + _connector = new SocketConnector(1, _executor); + } + else if (_settings.getProtocol().equalsIgnoreCase(Transport.VM)) + { + _address = new VmPipeAddress(_settings.getPort()); + _connector = new VmPipeConnector(); + } + else if (_settings.getProtocol().equalsIgnoreCase(Transport.SOCKET)) + { + Socket socket = ExistingSocketConnector.removeOpenSocket(_settings.getHost()); + if (socket == null) + { + throw new IllegalArgumentException("Active Socket must be provided for broker " + + "with 'socket://<SocketID>' transport"); + } + _address = socket.getRemoteSocketAddress(); + _executor = Executors.newFixedThreadPool(_processors); + _connector = new ExistingSocketConnector(1, _executor); + ((ExistingSocketConnector) _connector).setOpenSocket(socket); + } + else + { + throw new TransportException("Unknown protocol: " + _settings.getProtocol()); + } + _log.info("Connecting to broker on: " + _address); + + String s = "-"; + StackTraceElement[] trace = Thread.currentThread().getStackTrace(); + for (StackTraceElement elt : trace) + { + if (elt.getClassName().endsWith("Test")) + { + s = "-" + elt.getClassName(); + } + } + + IoServiceConfig cfg = _connector.getDefaultConfig(); + cfg.setThreadModel(ExecutorThreadModel.getInstance("MINANetworkTransport(Client)" + s)); + + // Socket based connection configuration only + if (_connector instanceof SocketConnector) + { + SocketSessionConfig scfg = (SocketSessionConfig) cfg.getSessionConfig(); + scfg.setTcpNoDelay(Boolean.getBoolean("amqj.tcpNoDelay")); + Integer sendBufferSize = Integer.getInteger("amqj.sendBufferSize", DEFAULT_BUFFER_SIZE); + Integer receiveBufferSize = Integer.getInteger("amqj.receiveBufferSize", DEFAULT_BUFFER_SIZE); + scfg.setSendBufferSize(sendBufferSize); + scfg.setReceiveBufferSize(receiveBufferSize); + + // Don't have the connector's worker thread wait around for other connections (we only use + // one SocketConnector per connection at the moment anyway). This allows short-running + // clients (like unit tests) to complete quickly. + ((SocketConnector) _connector).setWorkerTimeout(0); + } + + // Connect to the broker + ConnectFuture future = _connector.connect(_address, new MinaNetworkHandler(this, sslFactory), cfg); + future.join(); + if (!future.isConnected()) + { + throw new TransportException("Could not open connection"); + } + _session = future.getSession(); + _session.setAttachment(_receiver); + + _connection = new MinaNetworkConnection(_session); + return _connection; + } + + public void accept(ConnectionSettings settings, ReceiverFactory factory, SSLContextFactory sslFactory) + { + _processors = Runtime.getRuntime().availableProcessors(); + + if (settings.getProtocol().equalsIgnoreCase(Transport.TCP)) + { + _executor = Executors.newFixedThreadPool(_processors); + _acceptor = new SocketAcceptor(_processors, _executor); + + SocketAcceptorConfig sconfig = (SocketAcceptorConfig) _acceptor.getDefaultConfig(); + sconfig.setThreadModel(ExecutorThreadModel.getInstance("MINANetworkTransport(Acceptor)")); + SocketSessionConfig sc = (SocketSessionConfig) sconfig.getSessionConfig(); + sc.setTcpNoDelay(Boolean.getBoolean("amqj.tcpNoDelay")); + Integer sendBufferSize = Integer.getInteger("amqj.sendBufferSize", DEFAULT_BUFFER_SIZE); + Integer receiveBufferSize = Integer.getInteger("amqj.receiveBufferSize", DEFAULT_BUFFER_SIZE); + sc.setSendBufferSize(sendBufferSize); + sc.setReceiveBufferSize(receiveBufferSize); + + if (settings.getHost().equals("*")) + { + _address = new InetSocketAddress(settings.getPort()); + } + else + { + _address = new InetSocketAddress(settings.getHost(), settings.getPort()); + } + } + else if (settings.getProtocol().equalsIgnoreCase(Transport.VM)) + { + _acceptor = new VmPipeAcceptor(); + _address = new VmPipeAddress(settings.getPort()); + } + else + { + throw new TransportException("Unknown protocol: " + settings.getProtocol()); + } + + try + { + _acceptor.bind(_address, new MinaNetworkHandler(this, sslFactory, factory)); + } + catch (IOException e) + { + throw new TransportException("Could not bind to " + _address, e); + } + } + + public SocketAddress getAddress() + { + return _address; + } + + public void close() + { + if (_connection != null) + { + _connection.close(); + } + if (_acceptor != null) + { + _acceptor.unbindAll(); + _acceptor = null; + } + if (_session != null && _session.isConnected()) + { + CloseFuture closed = _session.close(); + closed.join(); + _session = null; + } + if (_executor != null) + { + _executor.shutdownNow(); + _executor = null; + } + } + + public boolean isCompatible(String protocol) + { + return (protocol.equalsIgnoreCase(Transport.TCP) || + protocol.equalsIgnoreCase(Transport.SOCKET) || + protocol.equalsIgnoreCase(Transport.VM)); + } +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaSender.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaSender.java index 22b9c5e784..2e55d6b761 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaSender.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaSender.java @@ -25,66 +25,56 @@ import org.apache.mina.common.CloseFuture; import org.apache.mina.common.IoSession; import org.apache.mina.common.WriteFuture; import org.apache.qpid.transport.Sender; -import org.apache.qpid.transport.TransportException; - +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * MinaSender */ - public class MinaSender implements Sender<java.nio.ByteBuffer> { - private static final int TIMEOUT = 2 * 60 * 1000; - - private final IoSession session; - private WriteFuture lastWrite = null; + private static final Logger _log = LoggerFactory.getLogger(MinaSender.class); + + private final IoSession _session; + private WriteFuture _lastWrite; + private int _idleTimeout = 0; public MinaSender(IoSession session) { - this.session = session; + _session = session; } - public void send(java.nio.ByteBuffer buf) + public synchronized void send(java.nio.ByteBuffer msg) { - if (session.isClosing()) - { - throw new TransportException("attempted to write to a closed socket"); - } - - synchronized (this) - { - lastWrite = session.write(ByteBuffer.wrap(buf)); - } + ByteBuffer mina = ByteBuffer.allocate(msg.limit()); + mina.put(msg); + mina.flip(); + _lastWrite = _session.write(mina); } - public void flush() + public synchronized void flush() { - // pass + if (_lastWrite != null) + { + _lastWrite.join(); + } } - public synchronized void close() + public void close() { - // MINA will sometimes throw away in-progress writes when you - // ask it to close - synchronized (this) - { - if (lastWrite != null) - { - lastWrite.join(); - } - } - CloseFuture closed = session.close(); + // MINA will sometimes throw away in-progress writes when you ask it to close + flush(); + CloseFuture closed = _session.close(); closed.join(); } public void setIdleTimeout(int i) { - //noop + _idleTimeout = i; } public long getIdleTimeout() { - return 0; + return _idleTimeout; } - } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioHandler.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioHandler.java index 84e66c25bd..784df61057 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioHandler.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioHandler.java @@ -41,7 +41,7 @@ public class NioHandler implements Runnable private Receiver<ByteBuffer> _receiver; private SocketChannel _ch; private ByteBuffer _readBuf; - private static Map<Long,NioSender> _handlers = new ConcurrentHashMap<Long,NioSender>(); + private static Map<Integer, NioSender> _handlers = new ConcurrentHashMap<Integer, NioSender>(); private NioHandler(){} @@ -87,7 +87,7 @@ public class NioHandler implements Runnable con.setSender(new Disassembler(sender, 64*1024 - 1)); con.setConnectionDelegate(delegate); - _handlers.put(con.getConnectionId(),sender); + _handlers.put(System.identityHashCode(con), sender); _receiver = new InputHandler(new Assembler(con), InputHandler.State.FRAME_HDR); @@ -125,11 +125,9 @@ public class NioHandler implements Runnable //throw new EOFException("The underlying socket/channel has closed"); } - public static void startBatchingFrames(int connectionId) + public static void startBatchingFrames(Connection con) { - NioSender sender = _handlers.get(connectionId); + NioSender sender = _handlers.get(System.identityHashCode(con)); sender.setStartBatching(); } - - } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/AMQTransportConnectionException.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/vm/VMBrokerCreationException.java index 6bef6216bd..ab748ca904 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/AMQTransportConnectionException.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/vm/VMBrokerCreationException.java @@ -18,26 +18,27 @@ * under the License. * */ -package org.apache.qpid.client.transport; +package org.apache.qpid.transport.vm; -import org.apache.qpid.AMQException; -import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.transport.TransportException; /** - * AMQTransportConnectionException indicates a failure to establish a connection through the transporting medium, to - * an AMQP broker. + * VMBrokerCreationException represents failure to create an in VM broker on the vm transport medium. * * <p/><table id="crc"><caption>CRC Card</caption> * <tr><th> Responsibilities <th> Collaborations - * <tr><td> Represent failure to connect through the transport medium. + * <tr><td> Represent failure to create an in VM broker. * </table> - * - * @todo Error code never used. This is not an AMQException. */ -public class AMQTransportConnectionException extends AMQException +public class VMBrokerCreationException extends TransportException { - public AMQTransportConnectionException(AMQConstant errorCode, String message, Throwable cause) + public VMBrokerCreationException(String message) + { + super(message); + } + + public VMBrokerCreationException(String message, Throwable cause) { - super(errorCode, message, cause); + super(message, cause); } } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/vm/VmBroker.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/vm/VmBroker.java new file mode 100644 index 0000000000..2c42659b45 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/vm/VmBroker.java @@ -0,0 +1,132 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.transport.vm; + +import java.lang.reflect.Method; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import org.apache.qpid.BrokerOptions; +import org.apache.qpid.protocol.ReceiverFactory; +import org.apache.qpid.transport.ConnectionSettings; +import org.apache.qpid.transport.network.NetworkTransport; +import org.apache.qpid.transport.network.Transport; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class is used to start an InVm broker instance. + */ +public class VmBroker +{ + private static final String BROKER_INSTANCE = "org.apache.qpid.server.BrokerInstance"; + + private static final Logger _logger = LoggerFactory.getLogger(VmBroker.class); + + private static Object _instance = null; + private static BrokerOptions _options = null; + + public static void createVMBroker() throws VMBrokerCreationException + { + if (_instance == null) + { + BrokerOptions options = new BrokerOptions(); + options.setProtocol("vm"); + options.setBind("localhost"); + options.setPorts(1); + + createVMBroker(options); + } + } + + public static void createVMBroker(BrokerOptions options) throws VMBrokerCreationException + { + synchronized (VmBroker.class) + { + if (_instance == null) + { + try + { + Class<?> brokerClass = Class.forName(BROKER_INSTANCE); + Object brokerInstance = brokerClass.newInstance(); + + Class<?>[] types = { BrokerOptions.class }; + Object[] args = { options }; + Method startup = brokerClass.getMethod("startup", types); + startup.invoke(brokerInstance, args); + + _instance = brokerInstance; + } + catch (Exception e) + { + String because; + if (e.getCause() == null) + { + because = e.toString(); + } + else + { + because = e.getCause().toString(); + } + _logger.warn("Unable to create InVM broker instance: " + because); + + throw new VMBrokerCreationException(because + " Stopped InVM broker instance creation", e); + } + _logger.info("Created InVM broker instance."); + } + } + } + + public static void killVMBroker() + { + _logger.info("Killing InVM broker"); + synchronized (VmBroker.class) + { + if (_instance != null) + { + try + { + Class<?> brokerClass = Class.forName(BROKER_INSTANCE); + Method shutdown = brokerClass.getMethod("shutdown", new Class[0]); + + shutdown.invoke(_instance); + + _instance = null; + } + catch (Exception e) + { + String because; + if (e.getCause() == null) + { + because = e.toString(); + } + else + { + because = e.getCause().toString(); + } + _logger.warn("Error shutting down broker instance: " + because); + } + } + _logger.info("Stopped InVM broker instance"); + } + } +} diff --git a/qpid/java/common/src/test/java/org/apache/qpid/test/utils/QpidTestCase.java b/qpid/java/common/src/test/java/org/apache/qpid/test/utils/QpidTestCase.java index 8b470d555e..0b2954af76 100644 --- a/qpid/java/common/src/test/java/org/apache/qpid/test/utils/QpidTestCase.java +++ b/qpid/java/common/src/test/java/org/apache/qpid/test/utils/QpidTestCase.java @@ -25,7 +25,9 @@ import java.io.File; import java.io.FileReader; import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import junit.framework.TestCase; import junit.framework.TestResult; @@ -36,6 +38,9 @@ public class QpidTestCase extends TestCase { protected static final Logger _logger = Logger.getLogger(QpidTestCase.class); + protected Map<String, String> _propertiesSetForTestOnly = new HashMap<String, String>(); + protected Map<String, String> _propertiesSetForBroker = new HashMap<String, String>(); + /** * Some tests are excluded when the property test.excludes is set to true. * An exclusion list is either a file (prop test.excludesfile) which contains one test name @@ -127,4 +132,108 @@ public class QpidTestCase extends TestCase return storeClass != null ? storeClass : MEMORY_STORE_CLASS_NAME ; } + + /** + * Set a System property that is to be applied only to the external test + * broker. + * + * This is a convenience method to enable the setting of a -Dproperty=value + * entry in QPID_OPTS + * + * This is only useful for the External Java Broker tests. + * + * @param property the property name + * @param value the value to set the property to + */ + protected void setBrokerOnlySystemProperty(String property, String value) + { + if (!_propertiesSetForBroker.containsKey(property)) + { + _propertiesSetForBroker.put(property, value); + } + + } + + /** + * Set a System (-D) property for this test run. + * + * This convenience method copies the current VMs System Property + * for the external VM Broker. + * + * @param property the System property to set + */ + protected void setSystemProperty(String property) + { + setSystemProperty(property, System.getProperty(property)); + } + + /** + * Set a System property for the duration of this test. + * + * When the test run is complete the value will be reverted. + * + * The values set using this method will also be propogated to the external + * Java Broker via a -D value defined in QPID_OPTS. + * + * If the value should not be set on the broker then use + * setTestClientSystemProperty(). + * + * @param property the property to set + * @param value the new value to use + */ + protected void setSystemProperty(String property, String value) + { + // Record the value for the external broker + _propertiesSetForBroker.put(property, value); + + //Set the value for the test client vm aswell. + setTestClientSystemProperty(property, value); + } + + /** + * Set a System (-D) property for the external Broker of this test. + * + * @param property The property to set + * @param value the value to set it to. + */ + protected void setTestClientSystemProperty(String property, String value) + { + if (!_propertiesSetForTestOnly.containsKey(property)) + { + // Record the current value so we can revert it later. + _propertiesSetForTestOnly.put(property, System.getProperty(property)); + } + + System.setProperty(property, value); + } + + /** + * Restore the System property values that were set before this test run. + */ + protected void revertSystemProperties() + { + for (String key : _propertiesSetForTestOnly.keySet()) + { + String value = _propertiesSetForTestOnly.get(key); + if (value != null) + { + System.setProperty(key, value); + } + else + { + System.clearProperty(key); + } + } + + _propertiesSetForTestOnly.clear(); + + // We don't change the current VMs settings for Broker only properties + // so we can just clear this map + _propertiesSetForBroker.clear(); + } + + protected void tearDown() throws Exception + { + revertSystemProperties(); + } } diff --git a/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java b/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java index 375a326654..6b7e204a3d 100644 --- a/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java +++ b/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java @@ -20,22 +20,28 @@ */ package org.apache.qpid.transport; -import org.apache.mina.util.AvailablePortFinder; - -import org.apache.qpid.test.utils.QpidTestCase; -import org.apache.qpid.transport.network.ConnectionBinding; -import org.apache.qpid.transport.network.io.IoAcceptor; -import org.apache.qpid.transport.util.Logger; -import org.apache.qpid.transport.util.Waiter; +import static org.apache.qpid.transport.Option.*; +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.net.Socket; import java.util.ArrayList; -import java.util.List; import java.util.Collections; +import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import java.io.IOException; -import static org.apache.qpid.transport.Option.*; +import org.apache.mina.util.AvailablePortFinder; +import org.apache.qpid.test.utils.QpidTestCase; +import org.apache.qpid.transport.Connection.State; +import org.apache.qpid.transport.network.Assembler; +import org.apache.qpid.transport.network.Disassembler; +import org.apache.qpid.transport.network.InputHandler; +import org.apache.qpid.transport.network.io.IoNetworkHandler; +import org.apache.qpid.transport.network.io.IoSender; +import org.apache.qpid.transport.util.Waiter; /** * ConnectionTest @@ -43,34 +49,20 @@ import static org.apache.qpid.transport.Option.*; public class ConnectionTest extends QpidTestCase implements SessionListener { - - private static final Logger log = Logger.get(ConnectionTest.class); - + private static final String TEST_TRANSPORT = "org.apache.qpid.transport.TestNetworkTransport"; + public static final int MAX_FRAME_SIZE = 64 * 1024 - 1; + private int port; private volatile boolean queue = false; private List<MessageTransfer> messages = new ArrayList<MessageTransfer>(); private List<MessageTransfer> incoming = new ArrayList<MessageTransfer>(); - private IoAcceptor _ioa = null; - - protected void setUp() throws Exception { - super.setUp(); - + setSystemProperty("qpid.transport.outgoing", TEST_TRANSPORT); port = AvailablePortFinder.getNextAvailable(12000); } - protected void tearDown() throws Exception - { - if (_ioa != null) - { - _ioa.close(); - } - - super.tearDown(); - } - public void opened(Session ssn) {} public void resumed(Session ssn) {} @@ -174,6 +166,7 @@ public class ConnectionTest extends QpidTestCase implements SessionListener } } }); + conn.setState(State.OPEN); conn.connect("localhost", port, null, "guest", "guest", false); return conn; } @@ -211,7 +204,7 @@ public class ConnectionTest extends QpidTestCase implements SessionListener private void startServer(final ProtocolHeader protocolHeader) { - ConnectionDelegate server = new ServerDelegate() + ConnectionDelegate delegate = new ServerDelegate() { @Override public void init(Connection conn, ProtocolHeader hdr) @@ -230,18 +223,39 @@ public class ConnectionTest extends QpidTestCase implements SessionListener return ssn; } }; - + try { - _ioa = new IoAcceptor("localhost", port, ConnectionBinding.get(server)); + final ServerSocket server = new ServerSocket(); + server.setReuseAddress(true); + server.bind(new InetSocketAddress(InetAddress.getByName("localhost"), port)); + + final Connection conn = new Connection(); + conn.setConnectionDelegate(delegate); + + Thread accept = new Thread(new Runnable() + { + public void run() + { + try + { + Socket client = server.accept(); + conn.setSender(new Disassembler(new IoSender(client, 32768, 30000), MAX_FRAME_SIZE)); + Thread receiver = new Thread(new IoNetworkHandler(client, new InputHandler(new Assembler(conn)), 32768)); + receiver.start(); + } + catch (IOException e) + { + // ignore + } + } + }); + accept.start(); } catch (IOException e) { - e.printStackTrace(); - fail("Unable to start Server for test due to:" + e.getMessage()); + // ignore } - - _ioa.start(); } public void testClosedNotificationAndWriteToClosed() throws Exception diff --git a/qpid/java/common/src/test/java/org/apache/qpid/transport/TestNetworkConnection.java b/qpid/java/common/src/test/java/org/apache/qpid/transport/TestNetworkConnection.java new file mode 100644 index 0000000000..b289d48828 --- /dev/null +++ b/qpid/java/common/src/test/java/org/apache/qpid/transport/TestNetworkConnection.java @@ -0,0 +1,91 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.transport; + +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.nio.ByteBuffer; + +import org.apache.qpid.transport.network.NetworkConnection; +import org.apache.qpid.transport.network.NetworkTransport; + +/** + * Test implementation of {@link NetworkTransport}. + * + * Exposes a given {@link SocketAddress} as both local and remote addresses, + * either by setting the address directly or by setting the host and port + * separately. All other methods are empty. + */ +public class TestNetworkConnection implements NetworkConnection +{ + private String _host = "127.0.0.1"; + private int _port = 1; + private SocketAddress _address = null; + + public SocketAddress getRemoteAddress() + { + return (_address != null) ? _address : new InetSocketAddress(_host, _port); + } + + public SocketAddress getLocalAddress() + { + return (_address != null) ? _address : new InetSocketAddress(_host, _port); + } + + public void setPort(int port) + { + _port = port; + } + + public int getPort() + { + return _port; + } + + public void setHost(String host) + { + _host = host; + } + + public void setAddress(SocketAddress address) + { + _address = address; + } + + public void close() + { + } + + public long getReadBytes() + { + return 0; + } + + public Sender<ByteBuffer> getSender() + { + return null; + } + + public long getWrittenBytes() + { + return 0; + } +} diff --git a/qpid/java/common/src/test/java/org/apache/qpid/transport/TestNetworkDriver.java b/qpid/java/common/src/test/java/org/apache/qpid/transport/TestNetworkDriver.java deleted file mode 100644 index 957a7190ee..0000000000 --- a/qpid/java/common/src/test/java/org/apache/qpid/transport/TestNetworkDriver.java +++ /dev/null @@ -1,133 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.transport; - -import java.net.BindException; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.nio.ByteBuffer; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -import org.apache.qpid.protocol.ProtocolEngine; -import org.apache.qpid.protocol.ProtocolEngineFactory; -import org.apache.qpid.ssl.SSLContextFactory; - -/** - * Test implementation of IoSession, which is required for some tests. Methods not being used are not implemented, - * so if this class is being used and some methods are to be used, then please update those. - */ -public class TestNetworkDriver implements NetworkDriver -{ - private final ConcurrentMap attributes = new ConcurrentHashMap(); - private String _remoteHost = "127.0.0.1"; - private String _localHost = "127.0.0.1"; - private int _port = 1; - private SocketAddress _localAddress = null; - private SocketAddress _remoteAddress = null; - - public TestNetworkDriver() - { - } - - public void bind(int port, InetAddress[] addresses, ProtocolEngineFactory protocolFactory, - NetworkDriverConfiguration config, SSLContextFactory sslFactory) throws BindException - { - - } - - public SocketAddress getLocalAddress() - { - return (_localAddress != null) ? _localAddress : new InetSocketAddress(_localHost, _port); - } - - public SocketAddress getRemoteAddress() - { - return (_remoteAddress != null) ? _remoteAddress : new InetSocketAddress(_remoteHost, _port); - } - - public void open(int port, InetAddress destination, ProtocolEngine engine, NetworkDriverConfiguration config, - SSLContextFactory sslFactory) throws OpenException - { - - } - - public void setMaxReadIdle(int idleTime) - { - - } - - public void setMaxWriteIdle(int idleTime) - { - - } - - public void close() - { - - } - - public void flush() - { - - } - - public void send(ByteBuffer msg) - { - - } - - public void setIdleTimeout(int i) - { - - } - - public void setPort(int port) - { - _port = port; - } - - public int getPort() - { - return _port; - } - - public void setLocalHost(String host) - { - _localHost = host; - } - - public void setRemoteHost(String host) - { - _remoteHost = host; - } - - public void setLocalAddress(SocketAddress address) - { - _localAddress = address; - } - - public void setRemoteAddress(SocketAddress address) - { - _remoteAddress = address; - } -} diff --git a/qpid/java/common/src/test/java/org/apache/qpid/transport/TestNetworkTransport.java b/qpid/java/common/src/test/java/org/apache/qpid/transport/TestNetworkTransport.java new file mode 100644 index 0000000000..a55c3b0fcd --- /dev/null +++ b/qpid/java/common/src/test/java/org/apache/qpid/transport/TestNetworkTransport.java @@ -0,0 +1,64 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.transport; + +import java.net.InetSocketAddress; +import java.net.SocketAddress; + +import org.apache.qpid.transport.network.NetworkTransport; +import org.apache.qpid.transport.network.io.IoNetworkTransport; + +/** + * Test implementation of {@link NetworkTransport}. + * + * Exposes a {@link SocketAddress}, all other methods are as in {@link IoNetworkTransport}. + */ +public class TestNetworkTransport extends IoNetworkTransport +{ + private String _host = "127.0.0.1"; + private int _port = 1; + private SocketAddress _address = null; + + public SocketAddress getAddress() + { + return (_address != null) ? _address : new InetSocketAddress(_host, _port); + } + + public void setPort(int port) + { + _port = port; + } + + public int getPort() + { + return _port; + } + + public void setHost(String host) + { + _host = host; + } + + public void setAddress(SocketAddress address) + { + _address = address; + } +} diff --git a/qpid/java/common/src/test/java/org/apache/qpid/transport/network/mina/MINANetworkDriverTest.java b/qpid/java/common/src/test/java/org/apache/qpid/transport/network/mina/MinaNetworkTransportTest.java index fc8e689ca4..e4761cc670 100644 --- a/qpid/java/common/src/test/java/org/apache/qpid/transport/network/mina/MINANetworkDriverTest.java +++ b/qpid/java/common/src/test/java/org/apache/qpid/transport/network/mina/MinaNetworkTransportTest.java @@ -24,38 +24,50 @@ package org.apache.qpid.transport.network.mina; import java.net.BindException; import java.net.InetAddress; import java.net.InetSocketAddress; -import java.net.SocketAddress; import java.net.UnknownHostException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import junit.framework.TestCase; - +import org.apache.mina.util.AvailablePortFinder; import org.apache.qpid.framing.AMQDataBlock; -import org.apache.qpid.protocol.ProtocolEngine; -import org.apache.qpid.protocol.ProtocolEngineFactory; -import org.apache.qpid.transport.NetworkDriver; +import org.apache.qpid.protocol.ReceiverFactory; +import org.apache.qpid.test.utils.QpidTestCase; +import org.apache.qpid.transport.ConnectionSettings; import org.apache.qpid.transport.OpenException; - -public class MINANetworkDriverTest extends TestCase +import org.apache.qpid.transport.Receiver; +import org.apache.qpid.transport.TransportException; +import org.apache.qpid.transport.network.IncomingNetworkTransport; +import org.apache.qpid.transport.network.NetworkConnection; +import org.apache.qpid.transport.network.NetworkTransport; +import org.apache.qpid.transport.network.OutgoingNetworkTransport; + +public class MinaNetworkTransportTest extends QpidTestCase { + public void testNothing() { assertTrue(true); } private static final String TEST_DATA = "YHALOTHAR"; - private static int TEST_PORT = 2323; - private NetworkDriver _server; - private NetworkDriver _client; - private CountingProtocolEngine _countingEngine; // Keeps a count of how many bytes it's read + private static int TEST_PORT = AvailablePortFinder.getNextAvailable(10000); + private IncomingNetworkTransport _server; + private OutgoingNetworkTransport _client; + private CountingReceiver _countingEngine; // Keeps a count of how many bytes it's read private Exception _thrownEx; + private ConnectionSettings _settings; + private NetworkConnection _network; @Override - public void setUp() + public void setUp() throws Exception { - _server = new MINANetworkDriver(); - _client = new MINANetworkDriver(); + _settings = new ConnectionSettings(); + _settings.setHost(InetAddress.getLocalHost().getHostName()); + _settings.setPort(TEST_PORT); + + _server = new MinaNetworkTransport(); + _client = new MinaNetworkTransport(); _thrownEx = null; - _countingEngine = new CountingProtocolEngine(); + _countingEngine = new CountingReceiver(); + // increment the port to prevent tests clashing with each other when // the port is in TIMED_WAIT state. TEST_PORT++; @@ -86,18 +98,18 @@ public class MINANetworkDriverTest extends TestCase { try { - _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null); + _client.connect(_settings, _countingEngine, null); } - catch (OpenException e) + catch (Exception e) { _thrownEx = e; } assertNotNull("Open should have failed since no engine bound", _thrownEx); - _server.bind(TEST_PORT, null, new EchoProtocolEngineSingletonFactory(), null, null); + _server.accept(_settings, null, null); - _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null); + _client.connect(_settings, _countingEngine, null); } /** @@ -108,16 +120,16 @@ public class MINANetworkDriverTest extends TestCase */ public void testBindOpenCloseOpen() throws BindException, UnknownHostException, OpenException { - _server.bind(TEST_PORT, null, new EchoProtocolEngineSingletonFactory(), null, null); - _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null); + _server.accept(_settings, new EchoSingletonFactory(), null); + _client.connect(_settings, _countingEngine, null); _client.close(); _server.close(); try { - _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null); + _client.connect(_settings, _countingEngine, null); } - catch (OpenException e) + catch (Exception e) { _thrownEx = e; } @@ -132,18 +144,19 @@ public class MINANetworkDriverTest extends TestCase { try { - _server.bind(TEST_PORT, null, new EchoProtocolEngineSingletonFactory(), null, null); + _server.accept(_settings, new EchoSingletonFactory(), null); } - catch (BindException e) + catch (TransportException e) { fail("First bind should not fail"); } try { - _client.bind(TEST_PORT, null, new EchoProtocolEngineSingletonFactory(), null, null); + IncomingNetworkTransport second = new MinaNetworkTransport(); + second.accept(_settings, new EchoSingletonFactory(), null); } - catch (BindException e) + catch (TransportException e) { _thrownEx = e; } @@ -161,14 +174,14 @@ public class MINANetworkDriverTest extends TestCase public void testSend() throws UnknownHostException, OpenException, InterruptedException, BindException { // Open a connection from a counting engine to an echo engine - _server.bind(TEST_PORT, null, new EchoProtocolEngineSingletonFactory(), null, null); - _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null); + _server.accept(_settings, new EchoSingletonFactory(), null); + _network = _client.connect(_settings, _countingEngine, null); // Tell the counting engine how much data we're sending _countingEngine.setNewLatch(TEST_DATA.getBytes().length); // Send the data and wait for up to 2 seconds to get it back - _client.send(ByteBuffer.wrap(TEST_DATA.getBytes())); + _network.getSender().send(ByteBuffer.wrap(TEST_DATA.getBytes())); _countingEngine.getLatch().await(2, TimeUnit.SECONDS); // Check what we got @@ -180,15 +193,14 @@ public class MINANetworkDriverTest extends TestCase * @throws BindException * @throws OpenException * @throws UnknownHostException - * */ public void testSetReadIdle() throws BindException, UnknownHostException, OpenException { // Open a connection from a counting engine to an echo engine - _server.bind(TEST_PORT, null, new EchoProtocolEngineSingletonFactory(), null, null); - _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null); + _server.accept(_settings, new EchoSingletonFactory(), null); + _network = _client.connect(_settings, _countingEngine, null); assertFalse("Reader should not have been idle", _countingEngine.getReaderHasBeenIdle()); - _client.setMaxReadIdle(1); + _network.getSender().setIdleTimeout(1); sleepForAtLeast(1500); assertTrue("Reader should have been idle", _countingEngine.getReaderHasBeenIdle()); } @@ -203,10 +215,10 @@ public class MINANetworkDriverTest extends TestCase public void testSetWriteIdle() throws BindException, UnknownHostException, OpenException { // Open a connection from a counting engine to an echo engine - _server.bind(TEST_PORT, null, new EchoProtocolEngineSingletonFactory(), null, null); - _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null); + _server.accept(_settings, new EchoSingletonFactory(), null); + _network = _client.connect(_settings, _countingEngine, null); assertFalse("Reader should not have been idle", _countingEngine.getWriterHasBeenIdle()); - _client.setMaxWriteIdle(1); + _network.getSender().setIdleTimeout(1); sleepForAtLeast(1500); assertTrue("Reader should have been idle", _countingEngine.getWriterHasBeenIdle()); } @@ -223,10 +235,10 @@ public class MINANetworkDriverTest extends TestCase public void testClosed() throws BindException, UnknownHostException, OpenException { // Open a connection from a counting engine to an echo engine - EchoProtocolEngineSingletonFactory factory = new EchoProtocolEngineSingletonFactory(); - _server.bind(TEST_PORT, null, factory, null, null); - _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null); - EchoProtocolEngine serverEngine = null; + EchoSingletonFactory factory = new EchoSingletonFactory(); + _server.accept(_settings, factory, null); + _network = _client.connect(_settings, _countingEngine, null); + EchoReceiver serverEngine = null; while (serverEngine == null) { serverEngine = factory.getEngine(); @@ -253,7 +265,7 @@ public class MINANetworkDriverTest extends TestCase } assertTrue("Server should have been closed", serverEngine.getClosed()); - _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null); + _client.connect(_settings, _countingEngine, null); _countingEngine.setClosed(false); assertFalse("Client should not have been closed", _countingEngine.getClosed()); _countingEngine.setNewLatch(1); @@ -278,15 +290,14 @@ public class MINANetworkDriverTest extends TestCase */ public void testExceptionCaught() throws BindException, UnknownHostException, OpenException, InterruptedException { - _server.bind(TEST_PORT, null, new EchoProtocolEngineSingletonFactory(), null, null); - _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null); - + _server.accept(_settings, new EchoSingletonFactory(), null); + _network = _client.connect(_settings, _countingEngine, null); assertEquals("Exception should not have been thrown", 1, _countingEngine.getExceptionLatch().getCount()); _countingEngine.setErrorOnNextRead(true); _countingEngine.setNewLatch(TEST_DATA.getBytes().length); - _client.send(ByteBuffer.wrap(TEST_DATA.getBytes())); + _network.getSender().send(ByteBuffer.wrap(TEST_DATA.getBytes())); _countingEngine.getExceptionLatch().await(2, TimeUnit.SECONDS); assertEquals("Exception should have been thrown", 0, _countingEngine.getExceptionLatch().getCount()); @@ -300,36 +311,34 @@ public class MINANetworkDriverTest extends TestCase */ public void testGetRemoteAddress() throws BindException, UnknownHostException, OpenException { - _server.bind(TEST_PORT, null, new EchoProtocolEngineSingletonFactory(), null, null); - _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null); + _server.accept(_settings, new EchoSingletonFactory(), null); + _network = _client.connect(_settings, _countingEngine, null); assertEquals(new InetSocketAddress(InetAddress.getLocalHost(), TEST_PORT), - _client.getRemoteAddress()); + _network.getRemoteAddress()); } - private class EchoProtocolEngineSingletonFactory implements ProtocolEngineFactory + private class EchoSingletonFactory implements ReceiverFactory { - EchoProtocolEngine _engine = null; + EchoReceiver _engine = null; - public ProtocolEngine newProtocolEngine(NetworkDriver driver) + public Receiver<ByteBuffer> newReceiver(NetworkTransport transport, NetworkConnection network) { if (_engine == null) { - _engine = new EchoProtocolEngine(); - _engine.setNetworkDriver(driver); + _engine = new EchoReceiver(); } return getEngine(); } - public EchoProtocolEngine getEngine() + public EchoReceiver getEngine() { return _engine; } } - public class CountingProtocolEngine implements ProtocolEngine + public class CountingReceiver implements Receiver<ByteBuffer> { - - protected NetworkDriver _driver; + protected NetworkConnection _network; public ArrayList<ByteBuffer> _receivedBytes = new ArrayList<ByteBuffer>(); private int _readBytes; private CountDownLatch _latch = new CountDownLatch(0); @@ -360,43 +369,14 @@ public class MINANetworkDriverTest extends TestCase return _readBytes; } - public SocketAddress getRemoteAddress() - { - if (_driver != null) - { - return _driver.getRemoteAddress(); - } - else - { - return null; - } - } - - public SocketAddress getLocalAddress() - { - if (_driver != null) - { - return _driver.getLocalAddress(); - } - else - { - return null; - } - } - - public long getWrittenBytes() - { - return 0; - } - public void readerIdle() { _readerHasBeenIdle = true; } - public void setNetworkDriver(NetworkDriver driver) + public void setNetworkConnection(NetworkConnection network) { - _driver = driver; + _network = network; } public void writeFrame(AMQDataBlock frame) @@ -463,14 +443,12 @@ public class MINANetworkDriverTest extends TestCase } - private class EchoProtocolEngine extends CountingProtocolEngine + private class EchoReceiver extends CountingReceiver { - public void received(ByteBuffer msg) { super.received(msg); msg.rewind(); - _driver.send(msg); } } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/client/AMQQueueDeferredOrderingTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/client/AMQQueueDeferredOrderingTest.java index ca10126aa7..986297bfe1 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/client/AMQQueueDeferredOrderingTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/client/AMQQueueDeferredOrderingTest.java @@ -30,7 +30,6 @@ import javax.jms.TextMessage; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.test.utils.QpidBrokerTestCase; -import org.apache.qpid.client.transport.TransportConnection; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/client/DispatcherTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/client/DispatcherTest.java index a8a23c2c41..bfe9e89de2 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/client/DispatcherTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/client/DispatcherTest.java @@ -20,12 +20,10 @@ */ package org.apache.qpid.client; -import java.util.Hashtable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import javax.jms.Connection; -import javax.jms.ConnectionFactory; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; @@ -34,10 +32,7 @@ import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.naming.Context; -import javax.naming.spi.InitialContextFactory; -import org.apache.qpid.client.transport.TransportConnection; -import org.apache.qpid.jndi.PropertiesFileInitialContextFactory; import org.apache.qpid.test.utils.QpidBrokerTestCase; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/client/MultipleJCAProviderRegistrationTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/client/MultipleJCAProviderRegistrationTest.java index 29b4dd82a7..eba99c30d6 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/client/MultipleJCAProviderRegistrationTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/client/MultipleJCAProviderRegistrationTest.java @@ -21,11 +21,7 @@ package org.apache.qpid.client; import org.apache.qpid.test.utils.QpidBrokerTestCase; -import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry; -import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.client.transport.TransportConnection; -import java.io.File; import java.security.Provider; import java.security.Security; import java.util.List; diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/failover/FailoverMethodTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/failover/FailoverMethodTest.java index ec222ff03d..c2a73bc797 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/failover/FailoverMethodTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/failover/FailoverMethodTest.java @@ -26,10 +26,10 @@ import org.apache.qpid.AMQDisconnectedException; import org.apache.qpid.AMQException; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQConnectionURL; -import org.apache.qpid.client.transport.TransportConnection; -import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.util.InternalBrokerBaseCase; +import org.apache.qpid.transport.vm.VMBrokerCreationException; +import org.apache.qpid.transport.vm.VmBroker; import org.apache.qpid.url.URLSyntaxException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,13 +47,13 @@ public class FailoverMethodTest extends InternalBrokerBaseCase implements Except public void createBroker() throws Exception { super.createBroker(); - TransportConnection.createVMBroker(ApplicationRegistry.DEFAULT_INSTANCE); + VmBroker.createVMBroker(); } @Override public void stopBroker() { - TransportConnection.killVMBroker(ApplicationRegistry.DEFAULT_INSTANCE); + VmBroker.killVMBrokers(); super.stopBroker(); } @@ -112,7 +112,7 @@ public class FailoverMethodTest extends InternalBrokerBaseCase implements Except } } - public void testFailoverSingleDelay() throws URLSyntaxException, AMQVMBrokerCreationException, + public void testFailoverSingleDelay() throws URLSyntaxException, VMBrokerCreationException, InterruptedException, JMSException { String connectionString = "amqp://guest:guest@/test?brokerlist='vm://:1?connectdelay='2000',retries='3''"; @@ -174,7 +174,7 @@ public class FailoverMethodTest extends InternalBrokerBaseCase implements Except * @throws InterruptedException * @throws JMSException */ - public void testNoFailover() throws URLSyntaxException, AMQVMBrokerCreationException, + public void testNoFailover() throws URLSyntaxException, VMBrokerCreationException, InterruptedException, JMSException { int CONNECT_DELAY = 2000; diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/BrokerLoggingTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/BrokerLoggingTest.java index 8fd2c085c3..c72beb01a1 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/BrokerLoggingTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/BrokerLoggingTest.java @@ -21,6 +21,9 @@ package org.apache.qpid.server.logging; import junit.framework.AssertionFailedError; + +import org.apache.qpid.BrokerOptions; +import org.apache.qpid.qmf.schema.BrokerSchema.AgentClass.BrokerBankProperty; import org.apache.qpid.server.Main; import org.apache.qpid.transport.ConnectionException; import org.apache.qpid.util.LogMonitor; @@ -203,7 +206,7 @@ public class BrokerLoggingTest extends AbstractTestLogging 1, findMatches(TESTID).size()); //3 - String defaultLog4j = _configFile.getParent() + "/" + Main.DEFAULT_LOG_CONFIG_FILENAME; + String defaultLog4j = _configFile.getParent() + "/" + BrokerOptions.DEFAULT_LOG_CONFIG_FILENAME; assertTrue("Log4j file(" + defaultLog4j + ") details not correctly logged:" + getMessageString(log), getMessageString(log).endsWith(defaultLog4j)); diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityTest.java index 6203e8a194..93833546d2 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityTest.java @@ -20,26 +20,15 @@ */ package org.apache.qpid.server.queue; -import junit.framework.TestCase; -import junit.framework.Assert; import org.apache.log4j.Logger; -import org.apache.qpid.client.transport.TransportConnection; -import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQSession; -import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQDestination; -import org.apache.qpid.jndi.PropertiesFileInitialContextFactory; import org.apache.qpid.test.utils.QpidBrokerTestCase; -import org.apache.qpid.url.URLSyntaxException; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; import javax.jms.*; import javax.naming.NamingException; -import javax.naming.Context; -import javax.naming.spi.InitialContextFactory; -import java.util.Hashtable; import java.util.HashMap; import java.util.Map; @@ -47,7 +36,6 @@ public class PriorityTest extends QpidBrokerTestCase { private static final int TIMEOUT = 1500; - private static final Logger _logger = Logger.getLogger(PriorityTest.class); protected final String QUEUE = "PriorityQueue"; diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/QueueDepthWithSelectorTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/QueueDepthWithSelectorTest.java index 74f50e8659..842a92c1a5 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/QueueDepthWithSelectorTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/QueueDepthWithSelectorTest.java @@ -21,6 +21,17 @@ package org.apache.qpid.server.queue; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; + +import org.apache.qpid.AMQException; +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.jndi.PropertiesFileInitialContextFactory; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.util.InternalBrokerBaseCase; +import org.apache.qpid.transport.vm.VmBroker; + import javax.jms.Connection; import javax.jms.JMSException; import javax.jms.Message; diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/qpid/CauseFailureInVM.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/qpid/CauseFailureInVM.java index 3e03ad0872..de393b9105 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/qpid/CauseFailureInVM.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/qpid/CauseFailureInVM.java @@ -20,10 +20,10 @@ */ package org.apache.qpid.test.framework.qpid; -import org.apache.qpid.client.transport.TransportConnection; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.test.framework.CauseFailure; import org.apache.qpid.test.framework.BrokerLifecycleAware; +import org.apache.qpid.transport.vm.VmBroker; /** * <p/><table id="crc"><caption>CRC Card</caption> @@ -64,7 +64,7 @@ public class CauseFailureInVM implements CauseFailure { int liveBroker = inVMTest.getLiveBroker(); - TransportConnection.killVMBroker(liveBroker); + VmBroker.killVMBroker(); ApplicationRegistry.remove(liveBroker); } } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/qpid/InVMBrokerDecorator.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/qpid/InVMBrokerDecorator.java index b92a72a654..97a4592486 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/qpid/InVMBrokerDecorator.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/framework/qpid/InVMBrokerDecorator.java @@ -23,11 +23,11 @@ package org.apache.qpid.test.framework.qpid; import junit.framework.Test; import junit.framework.TestResult; -import org.apache.qpid.client.transport.TransportConnection; -import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.test.framework.BrokerLifecycleAware; import org.apache.qpid.test.framework.FrameworkBaseCase; +import org.apache.qpid.transport.vm.VMBrokerCreationException; +import org.apache.qpid.transport.vm.VmBroker; import org.apache.qpid.junit.extensions.SetupTaskAware; import org.apache.qpid.junit.extensions.WrappedSuiteTestDecorator; @@ -89,9 +89,9 @@ public class InVMBrokerDecorator extends WrappedSuiteTestDecorator try { ApplicationRegistry.getInstance(1); - TransportConnection.createVMBroker(1); + VmBroker.createVMBroker(); } - catch (AMQVMBrokerCreationException e) + catch (VMBrokerCreationException e) { throw new RuntimeException("In-VM broker creation failed: " + e.getMessage(), e); } @@ -103,7 +103,7 @@ public class InVMBrokerDecorator extends WrappedSuiteTestDecorator public void run() { // Ensure that the in-vm broker is cleaned up so that the next test starts afresh. - TransportConnection.killVMBroker(1); + VmBroker.killVMBroker(); ApplicationRegistry.remove(1); } }); diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/MultipleConnectionTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/MultipleConnectionTest.java index 3a5f676ca6..87eae32cf8 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/MultipleConnectionTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/MultipleConnectionTest.java @@ -23,7 +23,6 @@ import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.AMQSession; import org.apache.qpid.client.AMQTopic; -import org.apache.qpid.client.transport.TransportConnection; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.test.utils.QpidBrokerTestCase; diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java index 79e2ff8148..aae8b1feb9 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java @@ -24,7 +24,6 @@ import junit.textui.TestRunner; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQQueue; -import org.apache.qpid.client.transport.TransportConnection; import org.apache.qpid.test.utils.QpidBrokerTestCase; import org.slf4j.Logger; diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java index f0794c9dab..521e9c9872 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java @@ -25,7 +25,6 @@ import org.apache.qpid.test.utils.QpidBrokerTestCase; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.protocol.AMQProtocolHandler; -import org.apache.qpid.client.transport.TransportConnection; import org.apache.qpid.framing.*; import org.apache.qpid.jms.ConnectionListener; import org.apache.qpid.protocol.AMQConstant; diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java index 278b9e9c04..97f37dc2b9 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java @@ -31,21 +31,21 @@ import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.protocol.AMQProtocolSession; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.test.utils.QpidBrokerTestCase; -import org.apache.qpid.transport.TestNetworkDriver; +import org.apache.qpid.transport.TestNetworkConnection; +import org.apache.qpid.transport.network.NetworkConnection; public class AMQProtocolSessionTest extends QpidBrokerTestCase { - private static class AMQProtSession extends AMQProtocolSession + private static class TestProtocolSession extends AMQProtocolSession { - - public AMQProtSession(AMQProtocolHandler protocolHandler, AMQConnection connection) + public TestProtocolSession(AMQProtocolHandler protocolHandler, AMQConnection connection) { super(protocolHandler,connection); } - public TestNetworkDriver getNetworkDriver() + public NetworkConnection getNetworkConnection() { - return (TestNetworkDriver) _protocolHandler.getNetworkDriver(); + return new TestNetworkConnection(); } public AMQShortString genQueueName() @@ -54,7 +54,7 @@ public class AMQProtocolSessionTest extends QpidBrokerTestCase } } - private AMQProtSession _testSession; + private TestProtocolSession _testSession; protected void setUp() throws Exception { @@ -62,10 +62,9 @@ public class AMQProtocolSessionTest extends QpidBrokerTestCase AMQConnection con = (AMQConnection) getConnection("guest", "guest"); AMQProtocolHandler protocolHandler = new AMQProtocolHandler(con); - protocolHandler.setNetworkDriver(new TestNetworkDriver()); //don't care about the values set here apart from the dummy IoSession - _testSession = new AMQProtSession(protocolHandler , con); + _testSession = new TestProtocolSession(protocolHandler , con); } public void testTemporaryQueueWildcard() throws UnknownHostException @@ -100,7 +99,7 @@ public class AMQProtocolSessionTest extends QpidBrokerTestCase private void checkTempQueueName(SocketAddress address, String queueName) { - _testSession.getNetworkDriver().setLocalAddress(address); + ((TestNetworkConnection) _testSession.getNetworkConnection()).setAddress(address); assertEquals("Wrong queue name", queueName, _testSession.genQueueName().asString()); } } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java index f32155064a..c366d956e9 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java @@ -17,6 +17,8 @@ */ package org.apache.qpid.test.utils; +import static org.apache.qpid.client.AMQConnection.*; + import java.io.File; import java.io.FileOutputStream; import java.io.IOException; @@ -54,18 +56,20 @@ import org.apache.commons.lang.StringUtils; import org.apache.log4j.Level; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; +import org.apache.qpid.BrokerOptions; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQConnectionFactory; import org.apache.qpid.client.AMQQueue; -import org.apache.qpid.client.transport.TransportConnection; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.jms.ConnectionURL; import org.apache.qpid.management.common.mbeans.ConfigurationManagement; import org.apache.qpid.server.configuration.ServerConfiguration; import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry; import org.apache.qpid.server.store.DerbyMessageStore; +import org.apache.qpid.transport.network.Transport; +import org.apache.qpid.transport.network.mina.MinaNetworkTransport; +import org.apache.qpid.transport.vm.VmBroker; import org.apache.qpid.url.URLSyntaxException; import org.apache.qpid.util.LogMonitor; @@ -82,8 +86,6 @@ public class QpidBrokerTestCase extends QpidTestCase protected long RECEIVE_TIMEOUT = 1000l; - private Map<String, String> _propertiesSetForTestOnly = new HashMap<String, String>(); - private Map<String, String> _propertiesSetForBroker = new HashMap<String, String>(); private Map<Logger, Level> _loggerLevelSetForTest = new HashMap<Logger, Level>(); private XMLConfiguration _testConfiguration = new XMLConfiguration(); @@ -122,8 +124,6 @@ public class QpidBrokerTestCase extends QpidTestCase protected static final String CPP = "cpp"; protected static final String VM = "vm"; protected static final String EXTERNAL = "external"; - private static final String VERSION_08 = "0-8"; - private static final String VERSION_010 = "0-10"; private static final String VERSION_0_8 = "0-8"; private static final String VERSION_0_9 = "0-9"; @@ -142,7 +142,7 @@ public class QpidBrokerTestCase extends QpidTestCase protected String _broker = System.getProperty(BROKER, VM); private String _brokerClean = System.getProperty(BROKER_CLEAN, null); private Boolean _brokerCleanBetweenTests = Boolean.getBoolean(BROKER_CLEAN_BETWEEN_TESTS); - private String _brokerVersion = System.getProperty(BROKER_VERSION, VERSION_08); + private String _brokerVersion = System.getProperty(BROKER_VERSION, VERSION_0_8); protected String _output = System.getProperty(TEST_OUTPUT); protected Boolean _brokerPersistent = Boolean.getBoolean(BROKER_PERSITENT); @@ -284,6 +284,11 @@ public class QpidBrokerTestCase extends QpidTestCase @Override protected void setUp() throws Exception { + if (isVmBroker()) + { + Transport.registerOutgoingTransport(MinaNetworkTransport.class); + } + if (!_configFile.exists()) { fail("Unable to test without config file:" + _configFile); @@ -456,26 +461,13 @@ public class QpidBrokerTestCase extends QpidTestCase setConfigurationProperty(ServerConfiguration.MGMT_CUSTOM_REGISTRY_SOCKET, String.valueOf(false)); saveTestConfiguration(); - // create an in_VM broker - final ConfigurationFileApplicationRegistry registry = new ConfigurationFileApplicationRegistry(_configFile); - try - { - ApplicationRegistry.initialise(registry, port); - } - catch (Exception e) - { - _logger.error("Broker initialise failed due to:",e); - try - { - registry.close(); - } - catch (Throwable closeE) - { - closeE.printStackTrace(); - } - throw e; - } - TransportConnection.createVMBroker(port); + BrokerOptions options = new BrokerOptions(); + options.setProtocol(VM); + options.setBind("localhost"); + options.setPorts(port); + options.setConfigFile(_configFile.getAbsolutePath()); + + VmBroker.createVMBroker(options); } else if (!_broker.equals(EXTERNAL)) { @@ -661,7 +653,7 @@ public class QpidBrokerTestCase extends QpidTestCase { port = getPort(port); - _logger.info("stopping broker: " + getBrokerCommand(port)); + _logger.info("stopping broker: " + getBrokerCommand(port) + " on port " + port); Process process = _brokers.remove(port); if (process != null) { @@ -671,8 +663,7 @@ public class QpidBrokerTestCase extends QpidTestCase } else if (_broker.equals(VM)) { - TransportConnection.killVMBroker(port); - ApplicationRegistry.remove(port); + VmBroker.killVMBroker(); } } @@ -760,105 +751,6 @@ public class QpidBrokerTestCase extends QpidTestCase } /** - * Set a System property that is to be applied only to the external test - * broker. - * - * This is a convenience method to enable the setting of a -Dproperty=value - * entry in QPID_OPTS - * - * This is only useful for the External Java Broker tests. - * - * @param property the property name - * @param value the value to set the property to - */ - protected void setBrokerOnlySystemProperty(String property, String value) - { - if (!_propertiesSetForBroker.containsKey(property)) - { - _propertiesSetForBroker.put(property, value); - } - - } - - /** - * Set a System (-D) property for this test run. - * - * This convenience method copies the current VMs System Property - * for the external VM Broker. - * - * @param property the System property to set - */ - protected void setSystemProperty(String property) - { - setSystemProperty(property, System.getProperty(property)); - } - - /** - * Set a System property for the duration of this test. - * - * When the test run is complete the value will be reverted. - * - * The values set using this method will also be propogated to the external - * Java Broker via a -D value defined in QPID_OPTS. - * - * If the value should not be set on the broker then use - * setTestClientSystemProperty(). - * - * @param property the property to set - * @param value the new value to use - */ - protected void setSystemProperty(String property, String value) - { - // Record the value for the external broker - _propertiesSetForBroker.put(property, value); - - //Set the value for the test client vm aswell. - setTestClientSystemProperty(property, value); - } - - /** - * Set a System (-D) property for the external Broker of this test. - * - * @param property The property to set - * @param value the value to set it to. - */ - protected void setTestClientSystemProperty(String property, String value) - { - if (!_propertiesSetForTestOnly.containsKey(property)) - { - // Record the current value so we can revert it later. - _propertiesSetForTestOnly.put(property, System.getProperty(property)); - } - - System.setProperty(property, value); - } - - /** - * Restore the System property values that were set before this test run. - */ - protected void revertSystemProperties() - { - for (String key : _propertiesSetForTestOnly.keySet()) - { - String value = _propertiesSetForTestOnly.get(key); - if (value != null) - { - System.setProperty(key, value); - } - else - { - System.clearProperty(key); - } - } - - _propertiesSetForTestOnly.clear(); - - // We don't change the current VMs settings for Broker only properties - // so we can just clear this map - _propertiesSetForBroker.clear(); - } - - /** * Add an environtmen variable for the external broker environment * * @param property the property to set @@ -910,27 +802,39 @@ public class QpidBrokerTestCase extends QpidTestCase */ public boolean isBroker08() { - return _brokerVersion.equals(VERSION_08); + return _brokerVersion.equals(VERSION_0_8); + } + + public boolean isBroker09() + { + return _brokerVersion.equals(VERSION_0_9) || + _brokerVersion.equals(VERSION_0_9_1) || + _brokerVersion.equals(VERSION_0_91); } public boolean isBroker010() { - return _brokerVersion.equals(VERSION_010); + return _brokerVersion.equals(VERSION_0_10); + } + + protected boolean isVmBroker() + { + return _broker.equals(VM); } protected boolean isJavaBroker() { - return _brokerLanguage.equals("java") || _broker.equals("vm"); + return _brokerLanguage.equals(JAVA) || _broker.equals(VM); } protected boolean isCppBroker() { - return _brokerLanguage.equals("cpp"); + return _brokerLanguage.equals(CPP); } protected boolean isExternalBroker() { - return !_broker.equals("vm"); + return !_broker.equals(VM); } protected boolean isBrokerStorePersistent() @@ -1085,7 +989,7 @@ public class QpidBrokerTestCase extends QpidTestCase } - protected void tearDown() throws java.lang.Exception + protected void tearDown() throws Exception { try { @@ -1095,9 +999,10 @@ public class QpidBrokerTestCase extends QpidTestCase c.close(); } } - finally{ + finally + { // Ensure any problems with close does not interfer with property resets - revertSystemProperties(); + super.tearDown(); revertLoggingLevels(); } } diff --git a/qpid/java/test-profiles/Java010Excludes b/qpid/java/test-profiles/Java010Excludes index 3486d5c70c..9ce968fcc1 100755 --- a/qpid/java/test-profiles/Java010Excludes +++ b/qpid/java/test-profiles/Java010Excludes @@ -31,6 +31,10 @@ org.apache.qpid.test.testcases.MandatoryMessageTest#test_QPID_508_MandatoryFails org.apache.qpid.test.testcases.MandatoryMessageTest#test_QPID_508_MandatoryFailsNoRouteNoTxPubSub org.apache.qpid.test.testcases.MandatoryMessageTest#test_QPID_508_MandatoryFailsNoRouteTxPubSub +// Failover for transacted sessions does not work (QPID-2994) +org.apache.qpid.test.client.QueueBrowserTransactedTest#testFailoverAsQueueBrowserCreated +org.apache.qpid.test.client.QueueBrowserTransactedTest#testFailoverWithQueueBrowser + //this test checks explicitly for 0-8 flow control semantics org.apache.qpid.test.client.FlowControlTest#* @@ -74,8 +78,11 @@ org.apache.qpid.test.unit.publish.DirtyTransactedPublishTest#* org.apache.qpid.test.client.RollbackOrderTest#testOrderingAfterRollbackOnMessage org.apache.qpid.test.unit.ack.RecoverTest#testRecoverInAutoAckListener +// Mina does not suopport idle timeouts +org.apache.qpid.transport.network.mina.MinaNetworkTransportTest#testSetReadIdle +org.apache.qpid.transport.network.mina.MinaNetworkTransportTest#testSetWriteIdle + //Temporarily adding the following until the issues are sorted out. //Should probably raise JIRAs for them. -org.apache.qpid.transport.network.mina.MINANetworkDriverTest#* org.apache.qpid.test.client.destination.AddressBasedDestinationTest#testCreateExchange org.apache.qpid.test.client.destination.AddressBasedDestinationTest#testBrowseMode diff --git a/qpid/java/test-profiles/default.testprofile b/qpid/java/test-profiles/default.testprofile index df8148f787..fe710f0e29 100644 --- a/qpid/java/test-profiles/default.testprofile +++ b/qpid/java/test-profiles/default.testprofile @@ -19,7 +19,8 @@ java.naming.factory.initial=org.apache.qpid.jndi.PropertiesFileInitialContextFactory java.naming.provider.url=${test.profiles}/test-provider.properties -broker.version=0-8 +broker.version=0-9-1 +qpid.amqp.version= broker=vm broker.clean=${test.profiles}/clean-dir ${build.data} ${project.root}/build/work broker.ready=Listening on TCP port |