diff options
12 files changed, 950 insertions, 989 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/Main.java b/java/broker/src/main/java/org/apache/qpid/server/Main.java index 41aa22b8ef..449b52d737 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/Main.java +++ b/java/broker/src/main/java/org/apache/qpid/server/Main.java @@ -20,18 +20,6 @@ */ package org.apache.qpid.server; -import java.io.File; -import java.io.IOException; -import java.io.InputStream; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.util.EnumSet; -import java.util.HashSet; -import java.util.List; -import java.util.Properties; -import java.util.Set; -import java.util.logging.*; - import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.HelpFormatter; import org.apache.commons.cli.Option; @@ -40,28 +28,9 @@ import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; import org.apache.commons.cli.PosixParser; import org.apache.log4j.Logger; -import org.apache.log4j.PropertyConfigurator; -import org.apache.log4j.xml.QpidLog4JConfigurator; -import org.apache.qpid.common.QpidProperties; -import org.apache.qpid.framing.ProtocolVersion; -import org.apache.qpid.server.configuration.ServerConfiguration; -import org.apache.qpid.server.configuration.management.ConfigurationManagementMBean; -import org.apache.qpid.server.information.management.ServerInformationMBean; -import org.apache.qpid.server.logging.SystemOutMessageLogger; -import org.apache.qpid.server.logging.actors.BrokerActor; -import org.apache.qpid.server.logging.actors.CurrentActor; -import org.apache.qpid.server.logging.actors.GenericActor; -import org.apache.qpid.server.logging.management.LoggingManagementMBean; -import org.apache.qpid.server.logging.messages.BrokerMessages; -import org.apache.qpid.server.protocol.AMQProtocolEngineFactory; -import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory; -import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory.VERSION; +import org.apache.qpid.server.Broker.InitException; import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry; -import org.apache.qpid.server.transport.QpidAcceptor; -import org.apache.qpid.ssl.SSLContextFactory; -import org.apache.qpid.transport.NetworkDriver; -import org.apache.qpid.transport.network.mina.MINANetworkDriver; + /** * Main entry point for AMQPD. @@ -69,39 +38,41 @@ import org.apache.qpid.transport.network.mina.MINANetworkDriver; */ public class Main { - private static Logger _logger; - - private static final String DEFAULT_CONFIG_FILE = "etc/config.xml"; - - public static final String DEFAULT_LOG_CONFIG_FILENAME = "log4j.xml"; - public static final String QPID_HOME = "QPID_HOME"; - private static final int IPV4_ADDRESS_LENGTH = 4; + private final Options options = new Options(); + private CommandLine commandLine; - private static final char IPV4_LITERAL_SEPARATOR = '.'; - private java.util.logging.Logger FRAME_LOGGER; - private java.util.logging.Logger RAW_LOGGER; - - protected static class InitException extends Exception + public static void main(String[] args) { - InitException(String msg, Throwable cause) + //if the -Dlog4j.configuration property has not been set, enable the init override + //to stop Log4J wondering off and picking up the first log4j.xml/properties file it + //finds from the classpath when we get the first Loggers + if(System.getProperty("log4j.configuration") == null) { - super(msg, cause); + System.setProperty("log4j.defaultInitOverride", "true"); } - } - protected final Options options = new Options(); - protected CommandLine commandLine; + new Main(args); + } - protected Main(String[] args) + public Main(final String[] args) { setOptions(options); if (parseCommandline(args)) { - execute(); + try + { + execute(); + } + catch(Exception e) + { + System.err.println("Exception during startup: " + e); + e.printStackTrace(); + shutdown(1); + } } } - protected boolean parseCommandline(String[] args) + protected boolean parseCommandline(final String[] args) { try { @@ -119,8 +90,7 @@ public class Main } } - @SuppressWarnings("static-access") - protected void setOptions(Options options) + protected void setOptions(final Options options) { Option help = new Option("h", "help", false, "print this message"); Option version = new Option("v", "version", false, "print the version information and exit"); @@ -164,16 +134,21 @@ public class Main Option bind = OptionBuilder.withArgName("bind").hasArg() .withDescription("bind to the specified address. Overrides any value in the config file") - .withLongOpt("bind").create("b"); + .withLongOpt("bind").create(BrokerOptions.BIND); Option logconfig = OptionBuilder.withArgName("logconfig").hasArg() .withDescription("use the specified log4j xml configuration file. By " - + "default looks for a file named " + DEFAULT_LOG_CONFIG_FILENAME - + " in the same directory as the configuration file").withLongOpt("logconfig").create("l"); + + "default looks for a file named " + BrokerOptions.DEFAULT_LOG_CONFIG_FILE + + " in the same directory as the configuration file").withLongOpt("logconfig").create(BrokerOptions.LOG_CONFIG); Option logwatchconfig = OptionBuilder.withArgName("logwatch").hasArg() .withDescription("monitor the log file configuration file for changes. Units are seconds. " - + "Zero means do not check for changes.").withLongOpt("logwatch").create("w"); + + "Zero means do not check for changes.").withLongOpt("logwatch").create(BrokerOptions.WATCH); + + Option sslport = + OptionBuilder.withArgName("sslport").hasArg() + .withDescription("SSL port. Overrides any value in the config file") + .withLongOpt("sslport").create(BrokerOptions.SSL_PORTS); options.addOption(help); options.addOption(version); @@ -187,472 +162,120 @@ public class Main options.addOption(exclude0_8); options.addOption(mport); options.addOption(bind); + options.addOption(sslport); } - protected void execute() + protected void execute() throws Exception { - // note this understands either --help or -h. If an option only has a long name you can use that but if - // an option has a short name and a long name you must use the short name here. - if (commandLine.hasOption("h")) + BrokerOptions options = new BrokerOptions(); + String configFile = commandLine.getOptionValue(BrokerOptions.CONFIG); + if(configFile != null) { - HelpFormatter formatter = new HelpFormatter(); - formatter.printHelp("Qpid", options, true); + options.setConfigFile(configFile); } - else if (commandLine.hasOption("v")) - { - String ver = QpidProperties.getVersionString(); - - StringBuilder protocol = new StringBuilder("AMQP version(s) [major.minor]: "); - - boolean first = true; - for (ProtocolVersion pv : ProtocolVersion.getSupportedProtocolVersions()) - { - if (first) - { - first = false; - } - else - { - protocol.append(", "); - } - - protocol.append(pv.getMajorVersion()).append('-').append(pv.getMinorVersion()); - - } - System.out.println(ver + " (" + protocol + ")"); - } - else + String logWatchConfig = commandLine.getOptionValue(BrokerOptions.WATCH); + if(logWatchConfig != null) { - try - { - CurrentActor.set(new BrokerActor(new SystemOutMessageLogger())); - startup(); - CurrentActor.remove(); - } - catch (InitException e) - { - System.out.println("Initialisation Error : " + e.getMessage()); - shutdown(1); - } - catch (Throwable e) - { - System.out.println("Error initialising message broker: " + e); - e.printStackTrace(); - shutdown(1); - } + options.setLogWatchFrequency(Integer.parseInt(logWatchConfig)); } - } - - protected void shutdown(int status) - { - ApplicationRegistry.removeAll(); - System.exit(status); - } - - protected void startup() throws Exception - { - - FRAME_LOGGER = updateLogger("FRM", "qpid-frame.log"); - RAW_LOGGER = updateLogger("RAW", "qpid-raw.log"); - - final String QpidHome = System.getProperty(QPID_HOME); - final File defaultConfigFile = new File(QpidHome, DEFAULT_CONFIG_FILE); - final File configFile = new File(commandLine.getOptionValue("c", defaultConfigFile.getPath())); - if (!configFile.exists()) - { - String error = "File " + configFile + " could not be found. Check the file exists and is readable."; - if (QpidHome == null) - { - error = error + "\nNote: " + QPID_HOME + " is not set."; - } - - throw new InitException(error, null); - } - else + String logConfig = commandLine.getOptionValue(BrokerOptions.LOG_CONFIG); + if(logConfig != null) { - CurrentActor.get().message(BrokerMessages.CONFIG(configFile.getAbsolutePath())); + options.setLogConfigFile(logConfig); } - String logConfig = commandLine.getOptionValue("l"); - String logWatchConfig = commandLine.getOptionValue("w", "0"); - - int logWatchTime = 0; - try - { - logWatchTime = Integer.parseInt(logWatchConfig); - } - catch (NumberFormatException e) + String jmxPort = commandLine.getOptionValue(BrokerOptions.MANAGEMENT); + if(jmxPort != null) { - System.err.println("Log watch configuration value of " + logWatchConfig + " is invalid. Must be " - + "a non-negative integer. Using default of zero (no watching configured"); + options.setJmxPort(Integer.parseInt(jmxPort)); } - File logConfigFile; - if (logConfig != null) - { - logConfigFile = new File(logConfig); - configureLogging(logConfigFile, logWatchTime); - } - else + String bindAddr = commandLine.getOptionValue(BrokerOptions.BIND); + if (bindAddr != null) { - File configFileDirectory = configFile.getParentFile(); - logConfigFile = new File(configFileDirectory, DEFAULT_LOG_CONFIG_FILENAME); - configureLogging(logConfigFile, logWatchTime); + options.setBind(bindAddr); } - ConfigurationFileApplicationRegistry config = new ConfigurationFileApplicationRegistry(configFile); - ServerConfiguration serverConfig = config.getConfiguration(); - updateManagementPort(serverConfig, commandLine.getOptionValue("m")); - - ApplicationRegistry.initialise(config); - - // We have already loaded the BrokerMessages class by this point so we - // need to refresh the locale setting incase we had a different value in - // the configuration. - BrokerMessages.reload(); - - // AR.initialise() sets and removes its own actor so we now need to set the actor - // for the remainder of the startup, and the default actor if the stack is empty - CurrentActor.set(new BrokerActor(config.getCompositeStartupMessageLogger())); - CurrentActor.setDefault(new BrokerActor(config.getRootMessageLogger())); - GenericActor.setDefaultMessageLogger(config.getRootMessageLogger()); - - - try + String[] portStr = commandLine.getOptionValues(BrokerOptions.PORTS); + if(portStr != null) { - configureLoggingManagementMBean(logConfigFile, logWatchTime); - - ConfigurationManagementMBean configMBean = new ConfigurationManagementMBean(); - configMBean.register(); - - ServerInformationMBean sysInfoMBean = - new ServerInformationMBean(QpidProperties.getBuildVersion(), QpidProperties.getReleaseVersion()); - sysInfoMBean.register(); - - - String[] portStr = commandLine.getOptionValues("p"); - - Set<Integer> ports = new HashSet<Integer>(); - Set<Integer> exclude_0_10 = new HashSet<Integer>(); - Set<Integer> exclude_0_9_1 = new HashSet<Integer>(); - Set<Integer> exclude_0_9 = new HashSet<Integer>(); - Set<Integer> exclude_0_8 = new HashSet<Integer>(); - - if(portStr == null || portStr.length == 0) - { - - parsePortList(ports, serverConfig.getPorts()); - parsePortList(exclude_0_10, serverConfig.getPortExclude010()); - parsePortList(exclude_0_9_1, serverConfig.getPortExclude091()); - parsePortList(exclude_0_9, serverConfig.getPortExclude09()); - parsePortList(exclude_0_8, serverConfig.getPortExclude08()); - - } - else - { - parsePortArray(ports, portStr); - parsePortArray(exclude_0_10, commandLine.getOptionValues("exclude-0-10")); - parsePortArray(exclude_0_9_1, commandLine.getOptionValues("exclude-0-9-1")); - parsePortArray(exclude_0_9, commandLine.getOptionValues("exclude-0-9")); - parsePortArray(exclude_0_8, commandLine.getOptionValues("exclude-0-8")); - - } - - - - - String bindAddr = commandLine.getOptionValue("b"); - if (bindAddr == null) - { - bindAddr = serverConfig.getBind(); - } - InetAddress bindAddress = null; - - - - if (bindAddr.equals("wildcard")) + parsePortArray(options, portStr, false); + for(ProtocolExclusion pe : ProtocolExclusion.values()) { - bindAddress = new InetSocketAddress(0).getAddress(); + parsePortArray(options, commandLine.getOptionValues(pe.getExcludeName()), pe); } - else - { - bindAddress = InetAddress.getByAddress(parseIP(bindAddr)); - } - - String hostName = bindAddress.getCanonicalHostName(); - - - String keystorePath = serverConfig.getKeystorePath(); - String keystorePassword = serverConfig.getKeystorePassword(); - String certType = serverConfig.getCertType(); - SSLContextFactory sslFactory = null; - - if (!serverConfig.getSSLOnly()) - { - - for(int port : ports) - { - - NetworkDriver driver = new MINANetworkDriver(); - - Set<VERSION> supported = EnumSet.allOf(VERSION.class); - - if(exclude_0_10.contains(port)) - { - supported.remove(VERSION.v0_10); - } - - if(exclude_0_9_1.contains(port)) - { - supported.remove(VERSION.v0_9_1); - } - if(exclude_0_9.contains(port)) - { - supported.remove(VERSION.v0_9); - } - if(exclude_0_8.contains(port)) - { - supported.remove(VERSION.v0_8); - } - - MultiVersionProtocolEngineFactory protocolEngineFactory = - new MultiVersionProtocolEngineFactory(hostName, supported); - - - - driver.bind(port, new InetAddress[]{bindAddress}, protocolEngineFactory, - serverConfig.getNetworkConfiguration(), null); - ApplicationRegistry.getInstance().addAcceptor(new InetSocketAddress(bindAddress, port), - new QpidAcceptor(driver,"TCP")); - CurrentActor.get().message(BrokerMessages.LISTENING("TCP", port)); - - } - - } - - if (serverConfig.getEnableSSL()) - { - sslFactory = new SSLContextFactory(keystorePath, keystorePassword, certType); - NetworkDriver driver = new MINANetworkDriver(); - driver.bind(serverConfig.getSSLPort(), new InetAddress[]{bindAddress}, - new AMQProtocolEngineFactory(), serverConfig.getNetworkConfiguration(), sslFactory); - ApplicationRegistry.getInstance().addAcceptor(new InetSocketAddress(bindAddress, serverConfig.getSSLPort()), - new QpidAcceptor(driver,"TCP")); - CurrentActor.get().message(BrokerMessages.LISTENING("TCP/SSL", serverConfig.getSSLPort())); - } - - CurrentActor.get().message(BrokerMessages.READY()); - } - finally - { - // Startup is complete so remove the AR initialised Startup actor - CurrentActor.remove(); - } - - - } - - private java.util.logging.Logger updateLogger(final String logType, String logFileName) throws IOException - { - java.util.logging.Logger logger = java.util.logging.Logger.getLogger(logType); - logger.setLevel(Level.FINE); - Formatter formatter = new Formatter() + String[] sslPortStr = commandLine.getOptionValues(BrokerOptions.SSL_PORTS); + if(sslPortStr != null) { - @Override - public String format(final LogRecord record) + parsePortArray(options, sslPortStr, true); + for(ProtocolExclusion pe : ProtocolExclusion.values()) { - - return "[" + record.getMillis() + " "+ logType +"]\t" + record.getMessage() + "\n"; + parsePortArray(options, commandLine.getOptionValues(pe.getExcludeName()), pe); } - }; - for(Handler handler : logger.getHandlers()) - { - logger.removeHandler(handler); } - Handler handler = new ConsoleHandler(); - - handler.setLevel(Level.FINE); - handler.setFormatter(formatter); - - logger.addHandler(handler); - - - handler = new FileHandler(logFileName, true); - handler.setLevel(Level.FINE); - handler.setFormatter(formatter); + + startBroker(options); + } - logger.addHandler(handler); - return logger; + protected void startBroker(final BrokerOptions options) throws Exception + { + Broker broker = new Broker(); + broker.startup(options); } - private void parsePortArray(Set<Integer> ports, String[] portStr) - throws InitException + protected void shutdown(final int status) { - if(portStr != null) - { - for(int i = 0; i < portStr.length; i++) - { - try - { - ports.add(Integer.parseInt(portStr[i])); - } - catch (NumberFormatException e) - { - throw new InitException("Invalid port: " + portStr[i], e); - } - } - } + ApplicationRegistry.remove(); + System.exit(status); } - private void parsePortList(Set<Integer> output, List input) - throws InitException + private static void parsePortArray(final BrokerOptions options,final Object[] ports, + final boolean ssl) throws InitException { - if(input != null) + if(ports != null) { - for(Object port : input) + for(int i = 0; i < ports.length; i++) { try { - output.add(Integer.parseInt(String.valueOf(port))); + if(ssl) + { + options.addSSLPort(Integer.parseInt(String.valueOf(ports[i]))); + } + else + { + options.addPort(Integer.parseInt(String.valueOf(ports[i]))); + } } catch (NumberFormatException e) { - throw new InitException("Invalid port: " + port, e); + throw new InitException("Invalid port: " + ports[i], e); } } } } - /** - * Update the configuration data with the management port. - * @param configuration - * @param managementPort The string from the command line - */ - private void updateManagementPort(ServerConfiguration configuration, String managementPort) + private static void parsePortArray(final BrokerOptions options, final Object[] ports, + final ProtocolExclusion excludedProtocol) throws InitException { - if (managementPort != null) + if(ports != null) { - try - { - configuration.setJMXManagementPort(Integer.parseInt(managementPort)); - } - catch (NumberFormatException e) - { - _logger.warn("Invalid management port: " + managementPort + " will use:" + configuration.getJMXManagementPort(), e); - } - } - } - - public static void main(String[] args) - { - //if the -Dlog4j.configuration property has not been set, enable the init override - //to stop Log4J wondering off and picking up the first log4j.xml/properties file it - //finds from the classpath when we get the first Loggers - if(System.getProperty("log4j.configuration") == null) - { - System.setProperty("log4j.defaultInitOverride", "true"); - } - - //now that the override status is know, we can instantiate the Loggers - _logger = Logger.getLogger(Main.class); - - new Main(args); - } - - private byte[] parseIP(String address) throws Exception - { - char[] literalBuffer = address.toCharArray(); - int byteCount = 0; - int currByte = 0; - byte[] ip = new byte[IPV4_ADDRESS_LENGTH]; - for (int i = 0; i < literalBuffer.length; i++) - { - char currChar = literalBuffer[i]; - if ((currChar >= '0') && (currChar <= '9')) - { - currByte = (currByte * 10) + (Character.digit(currChar, 10) & 0xFF); - } - - if (currChar == IPV4_LITERAL_SEPARATOR || (i + 1 == literalBuffer.length)) - { - ip[byteCount++] = (byte) currByte; - currByte = 0; - } - } - - if (byteCount != 4) - { - throw new Exception("Invalid IP address: " + address); - } - return ip; - } - - private void configureLogging(File logConfigFile, int logWatchTime) throws InitException, IOException - { - if (logConfigFile.exists() && logConfigFile.canRead()) - { - CurrentActor.get().message(BrokerMessages.LOG_CONFIG(logConfigFile.getAbsolutePath())); - - if (logWatchTime > 0) - { - System.out.println("log file " + logConfigFile.getAbsolutePath() + " will be checked for changes every " - + logWatchTime + " seconds"); - // log4j expects the watch interval in milliseconds - try - { - QpidLog4JConfigurator.configureAndWatch(logConfigFile.getPath(), logWatchTime * 1000); - } - catch (Exception e) - { - throw new InitException(e.getMessage(),e); - } - } - else + for(int i = 0; i < ports.length; i++) { try { - QpidLog4JConfigurator.configure(logConfigFile.getPath()); + options.addExcludedPort(excludedProtocol, + Integer.parseInt(String.valueOf(ports[i]))); } - catch (Exception e) - { - throw new InitException(e.getMessage(),e); - } - } - } - else - { - System.err.println("Logging configuration error: unable to read file " + logConfigFile.getAbsolutePath()); - System.err.println("Using the fallback internal log4j.properties configuration"); - - InputStream propsFile = this.getClass().getResourceAsStream("/log4j.properties"); - if(propsFile == null) - { - throw new IOException("Unable to load the fallback internal log4j.properties configuration file"); - } - else - { - try - { - Properties fallbackProps = new Properties(); - fallbackProps.load(propsFile); - PropertyConfigurator.configure(fallbackProps); - } - finally + catch (NumberFormatException e) { - propsFile.close(); + throw new InitException("Invalid port for exclusion: " + ports[i], e); } } } } - - private void configureLoggingManagementMBean(File logConfigFile, int logWatchTime) throws Exception - { - LoggingManagementMBean blm = new LoggingManagementMBean(logConfigFile.getPath(),logWatchTime); - - blm.register(); - } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java index 500a34b4a8..460ea93509 100755 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java @@ -20,56 +20,44 @@ */ package org.apache.qpid.server.protocol; +import java.util.EnumSet; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; + import org.apache.qpid.protocol.ProtocolEngineFactory; -import org.apache.qpid.protocol.ProtocolEngine; -import org.apache.qpid.transport.NetworkDriver; +import org.apache.qpid.protocol.ServerProtocolEngine; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.registry.IApplicationRegistry; - -import java.util.Set; -import java.util.Arrays; -import java.util.HashSet; +import org.apache.qpid.transport.network.NetworkConnection; public class MultiVersionProtocolEngineFactory implements ProtocolEngineFactory { - ; - - - public enum VERSION { v0_8, v0_9, v0_9_1, v0_10, v1_0_0 }; - - private static final Set<VERSION> ALL_VERSIONS = new HashSet<VERSION>(Arrays.asList(VERSION.values())); + private static final Set<AmqpProtocolVersion> ALL_VERSIONS = EnumSet.allOf(AmqpProtocolVersion.class); + private static final AtomicLong ID_GENERATOR = new AtomicLong(0); private final IApplicationRegistry _appRegistry; private final String _fqdn; - private final Set<VERSION> _supported; - + private final Set<AmqpProtocolVersion> _supported; public MultiVersionProtocolEngineFactory() { - this(1, "localhost", ALL_VERSIONS); - } - - public MultiVersionProtocolEngineFactory(String fqdn, Set<VERSION> versions) - { - this(1, fqdn, versions); + this("localhost", ALL_VERSIONS); } - public MultiVersionProtocolEngineFactory(String fqdn) { - this(1, fqdn, ALL_VERSIONS); + this(fqdn, ALL_VERSIONS); } - public MultiVersionProtocolEngineFactory(int instance, String fqdn, Set<VERSION> supportedVersions) + public MultiVersionProtocolEngineFactory(String fqdn, Set<AmqpProtocolVersion> supportedVersions) { - _appRegistry = ApplicationRegistry.getInstance(instance); + _appRegistry = ApplicationRegistry.getInstance(); _fqdn = fqdn; _supported = supportedVersions; } - - public ProtocolEngine newProtocolEngine(NetworkDriver networkDriver) + public ServerProtocolEngine newProtocolEngine(NetworkConnection network) { - return new MultiVersionProtocolEngine(_appRegistry, _fqdn, _supported, networkDriver); + return new MultiVersionProtocolEngine(_appRegistry, _fqdn, _supported, network, ID_GENERATOR.getAndIncrement()); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index 3a89194eb9..a095ef47ea 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -44,7 +44,7 @@ import org.apache.qpid.server.logging.subjects.QueueLogSubject; import org.apache.qpid.server.management.ManagedObject; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.security.PrincipalHolder; +import org.apache.qpid.server.security.AuthorizationHolder; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.subscription.SubscriptionList; import org.apache.qpid.server.txn.AutoCommitTransaction; @@ -83,7 +83,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener /** null means shared */ private final AMQShortString _owner; - private PrincipalHolder _prinicpalHolder; + private AuthorizationHolder _authorizationHolder; private boolean _exclusive = false; private AMQSessionModel _exclusiveOwner; @@ -102,9 +102,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener protected final QueueEntryList _entries; - protected final SubscriptionList _subscriptionList = new SubscriptionList(this); - - private final AtomicReference<SubscriptionList.SubscriptionNode> _lastSubscriptionNode = new AtomicReference<SubscriptionList.SubscriptionNode>(_subscriptionList.getHead()); + protected final SubscriptionList _subscriptionList = new SubscriptionList(); private volatile Subscription _exclusiveSubscriber; @@ -188,7 +186,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener //TODO : persist creation time private long _createTime = System.currentTimeMillis(); private ConfigurationPlugin _queueConfiguration; - private final boolean _isTopic; + protected SimpleAMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, boolean exclusive, VirtualHost virtualHost, Map<String,Object> arguments) @@ -234,12 +232,10 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener _exclusive = exclusive; _virtualHost = virtualHost; _entries = entryListFactory.createQueueEntryList(this); - _arguments = arguments == null ? Collections.EMPTY_MAP : arguments; + _arguments = arguments; _id = virtualHost.getConfigStore().createId(); - _isTopic = arguments != null && arguments.containsKey("topic"); - _asyncDelivery = ReferenceCountingExecutorService.getInstance().acquireExecutorService(); _logSubject = new QueueLogSubject(this); @@ -331,7 +327,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { return _exclusive; } - + public void setExclusive(boolean exclusive) throws AMQException { _exclusive = exclusive; @@ -375,14 +371,14 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener return _owner; } - public PrincipalHolder getPrincipalHolder() + public AuthorizationHolder getAuthorizationHolder() { - return _prinicpalHolder; + return _authorizationHolder; } - public void setPrincipalHolder(PrincipalHolder prinicpalHolder) + public void setAuthorizationHolder(final AuthorizationHolder authorizationHolder) { - _prinicpalHolder = prinicpalHolder; + _authorizationHolder = authorizationHolder; } @@ -406,8 +402,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { throw new AMQSecurityException("Permission denied"); } - - + + if (hasExclusiveSubscriber()) { throw new ExistingExclusiveSubscription(); @@ -437,14 +433,14 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener subscription.setNoLocal(_nolocal); } _subscriptionList.add(subscription); - + //Increment consumerCountHigh if necessary. (un)registerSubscription are both //synchronized methods so we don't need additional synchronization here if(_counsumerCountHigh.get() < getConsumerCount()) { _counsumerCountHigh.incrementAndGet(); } - + if (isDeleted()) { subscription.queueDeleted(this); @@ -490,11 +486,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener // queue. This is because the delete method uses the subscription set which has just been cleared subscription.queueDeleted(this); } - - if(_subscriptionList.size() == 0 && _isTopic) - { - clearQueue(); - } } } @@ -521,10 +512,10 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener break; } } - + reconfigure(); } - + private void reconfigure() { //Reconfigure the queue for to reflect this new binding. @@ -550,7 +541,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener public void removeBinding(final Binding binding) { _bindings.remove(binding); - + reconfigure(); } @@ -577,104 +568,101 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener public void enqueue(ServerMessage message, PostEnqueueAction action) throws AMQException { + incrementTxnEnqueueStats(message); + incrementQueueCount(); + incrementQueueSize(message); _totalMessagesReceived.incrementAndGet(); + QueueEntry entry; Subscription exclusiveSub = _exclusiveSubscriber; - if(!_isTopic || _subscriptionList.size()!=0) - { - incrementTxnEnqueueStats(message); - incrementQueueCount(); - incrementQueueSize(message); - QueueEntry entry; + if (exclusiveSub != null) + { + exclusiveSub.getSendLock(); - if (exclusiveSub != null) + try { - exclusiveSub.getSendLock(); - - try - { - entry = _entries.add(message); + entry = _entries.add(message); - deliverToSubscription(exclusiveSub, entry); - } - finally - { - exclusiveSub.releaseSendLock(); - } + deliverToSubscription(exclusiveSub, entry); } - else + finally { - entry = _entries.add(message); - /* + exclusiveSub.releaseSendLock(); + } + } + else + { + entry = _entries.add(message); + /* - iterate over subscriptions and if any is at the end of the queue and can deliver this message, then deliver the message + iterate over subscriptions and if any is at the end of the queue and can deliver this message, then deliver the message - */ - SubscriptionList.SubscriptionNode node = _lastSubscriptionNode.get(); - SubscriptionList.SubscriptionNode nextNode = node.getNext(); - if (nextNode == null) + */ + SubscriptionList.SubscriptionNode node = _subscriptionList.getMarkedNode(); + SubscriptionList.SubscriptionNode nextNode = node.findNext(); + if (nextNode == null) + { + nextNode = _subscriptionList.getHead().findNext(); + } + while (nextNode != null) + { + if (_subscriptionList.updateMarkedNode(node, nextNode)) { - nextNode = _subscriptionList.getHead().getNext(); + break; } - while (nextNode != null) + else { - if (_lastSubscriptionNode.compareAndSet(node, nextNode)) - { - break; - } - else + node = _subscriptionList.getMarkedNode(); + nextNode = node.findNext(); + if (nextNode == null) { - node = _lastSubscriptionNode.get(); - nextNode = node.getNext(); - if (nextNode == null) - { - nextNode = _subscriptionList.getHead().getNext(); - } + nextNode = _subscriptionList.getHead().findNext(); } } + } - // always do one extra loop after we believe we've finished - // this catches the case where we *just* miss an update - int loops = 2; + // always do one extra loop after we believe we've finished + // this catches the case where we *just* miss an update + int loops = 2; - while (!(entry.isAcquired() || entry.isDeleted()) && loops != 0) + while (entry.isAvailable() && loops != 0) + { + if (nextNode == null) { - if (nextNode == null) - { - loops--; - nextNode = _subscriptionList.getHead(); - } - else - { - // if subscription at end, and active, offer - Subscription sub = nextNode.getSubscription(); - deliverToSubscription(sub, entry); - } - nextNode = nextNode.getNext(); - + loops--; + nextNode = _subscriptionList.getHead(); + } + else + { + // if subscription at end, and active, offer + Subscription sub = nextNode.getSubscription(); + deliverToSubscription(sub, entry); } + nextNode = nextNode.findNext(); + } + } - if (!(entry.isAcquired() || entry.isDeleted())) - { - checkSubscriptionsNotAheadOfDelivery(entry); + if (entry.isAvailable()) + { + checkSubscriptionsNotAheadOfDelivery(entry); - deliverAsync(); - } + deliverAsync(); + } - if(_managedObject != null) - { - _managedObject.checkForNotification(entry.getMessage()); - } + if(_managedObject != null) + { + _managedObject.checkForNotification(entry.getMessage()); + } - if(action != null) - { - action.onEnqueue(entry); - } + if(action != null) + { + action.onEnqueue(entry); } + } private void deliverToSubscription(final Subscription sub, final QueueEntry entry) @@ -730,20 +718,20 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { getAtomicQueueCount().incrementAndGet(); } - + private void incrementTxnEnqueueStats(final ServerMessage message) { SessionConfig session = message.getSessionConfig(); - + if(session !=null && session.isTransactional()) { _msgTxnEnqueues.incrementAndGet(); _byteTxnEnqueues.addAndGet(message.getSize()); } } - + private void incrementTxnDequeueStats(QueueEntry entry) - { + { _msgTxnDequeues.incrementAndGet(); _byteTxnDequeues.addAndGet(entry.getSize()); } @@ -757,40 +745,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener incrementUnackedMsgCount(); sub.send(entry); - - if(_isTopic) - { - if(allSubscriptionsAhead(entry) && entry.acquire()) - { - entry.discard(); - } - } - } - - private boolean allSubscriptionsAhead(final QueueEntry entry) - { - SubscriptionList.SubscriptionNodeIterator subIter = _subscriptionList.iterator(); - while(subIter.advance() && !entry.isAcquired()) - { - final Subscription subscription = subIter.getNode().getSubscription(); - if(!subscription.isClosed()) - { - QueueContext context = (QueueContext) subscription.getQueueContext(); - if(context != null) - { - QueueEntry subnode = context._lastSeenEntry; - if(subnode.compareTo(entry)<0) - { - return false; - } - } - else - { - return false; - } - } - } - return true; } private boolean subscriptionReadyAndHasInterest(final Subscription sub, final QueueEntry entry) throws AMQException @@ -849,24 +803,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } - public void requeue(QueueEntryImpl entry, Subscription subscription) - { - SubscriptionList.SubscriptionNodeIterator subscriberIter = _subscriptionList.iterator(); - // iterate over all the subscribers, and if they are in advance of this queue entry then move them backwards - while (subscriberIter.advance()) - { - Subscription sub = subscriberIter.getNode().getSubscription(); - - // we don't make browsers send the same stuff twice - if (sub.seesRequeues() && (!sub.acquires() && sub == subscription)) - { - updateSubRequeueEntry(sub, entry); - } - } - - deliverAsync(); - } - public void dequeue(QueueEntry entry, Subscription sub) { decrementQueueCount(); @@ -875,7 +811,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { _deliveredMessages.decrementAndGet(); } - + if(sub != null && sub.isSessionTransactional()) { incrementTxnDequeueStats(entry); @@ -932,7 +868,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { return _subscriptionList.size(); } - + public int getConsumerCountHigh() { return _counsumerCountHigh.get(); @@ -1004,7 +940,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener while (queueListIterator.advance()) { QueueEntry node = queueListIterator.getNode(); - if (node != null && !node.isDeleted()) + if (node != null && !node.isDispensed()) { entryList.add(node); } @@ -1108,7 +1044,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener while (queueListIterator.advance() && !filter.filterComplete()) { QueueEntry node = queueListIterator.getNode(); - if (!node.isDeleted() && filter.accept(node)) + if (!node.isDispensed() && filter.accept(node)) { entryList.add(node); } @@ -1302,7 +1238,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener if ((messageId >= fromMessageId) && (messageId <= toMessageId) - && !node.isDeleted() && node.acquire()) { dequeueEntry(node); @@ -1332,7 +1267,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener while (noDeletes && queueListIterator.advance()) { QueueEntry node = queueListIterator.getNode(); - if (!node.isDeleted() && node.acquire()) + if (node.acquire()) { dequeueEntry(node); noDeletes = false; @@ -1342,7 +1277,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } public long clearQueue() throws AMQException - { + { return clear(0l); } @@ -1353,7 +1288,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { throw new AMQSecurityException("Permission denied: queue " + getName()); } - + QueueEntryIterator queueListIterator = _entries.iterator(); long count = 0; @@ -1362,7 +1297,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener while (queueListIterator.advance()) { QueueEntry node = queueListIterator.getNode(); - if (!node.isDeleted() && node.acquire()) + if (node.acquire()) { dequeueEntry(node, txn); if(++count == request) @@ -1420,7 +1355,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { throw new AMQSecurityException("Permission denied: " + getName()); } - + if (!_deleted.getAndSet(true)) { @@ -1629,7 +1564,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener public void deliverAsync() { - Runner runner = new Runner(_stateChangeCount.incrementAndGet()); + QueueRunner runner = new QueueRunner(this, _stateChangeCount.incrementAndGet()); if (_asynchronousRunner.compareAndSet(null, runner)) { @@ -1648,52 +1583,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener _asyncDelivery.execute(flusher); } - - private class Runner implements ReadWriteRunnable - { - String _name; - public Runner(long count) - { - _name = "QueueRunner-" + count + "-" + _logActor; - } - - public void run() - { - String originalName = Thread.currentThread().getName(); - try - { - Thread.currentThread().setName(_name); - CurrentActor.set(_logActor); - - processQueue(this); - } - catch (AMQException e) - { - _logger.error(e); - } - finally - { - CurrentActor.remove(); - Thread.currentThread().setName(originalName); - } - } - - public boolean isRead() - { - return false; - } - - public boolean isWrite() - { - return true; - } - - public String toString() - { - return _name; - } - } - public void flushSubscription(Subscription sub) throws AMQException { // Access control @@ -1714,9 +1603,12 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { sub.getSendLock(); atTail = attemptDelivery(sub); - if (atTail && getNextAvailableEntry(sub) == null) + if (atTail && sub.isAutoClose()) { - sub.queueEmpty(); + unregisterSubscription(sub); + + sub.confirmAutoClose(); + } else if (!atTail) { @@ -1737,7 +1629,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { advanceAllSubscriptions(); } - return atTail; } @@ -1760,7 +1651,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener QueueEntry node = getNextAvailableEntry(sub); - if (node != null && !(node.isAcquired() || node.isDeleted())) + if (node != null && node.isAvailable()) { if (sub.hasInterest(node)) { @@ -1821,7 +1712,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener QueueEntry node = (releasedNode != null && lastSeen.compareTo(releasedNode)>=0) ? releasedNode : _entries.next(lastSeen); boolean expired = false; - while (node != null && (node.isAcquired() || node.isDeleted() || (expired = node.expired()) || !sub.hasInterest(node))) + while (node != null && (!node.isAvailable() || (expired = node.expired()) || !sub.hasInterest(node))) { if (expired) { @@ -1850,14 +1741,40 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } - private void processQueue(Runnable runner) throws AMQException + /** + * Used by queue Runners to asynchronously deliver messages to consumers. + * + * A queue Runner is started whenever a state change occurs, e.g when a new + * message arrives on the queue and cannot be immediately delivered to a + * subscription (i.e. asynchronous delivery is required). Unless there are + * SubFlushRunners operating (due to subscriptions unsuspending) which are + * capable of accepting/delivering all messages then these messages would + * otherwise remain on the queue. + * + * processQueue should be running while there are messages on the queue AND + * there are subscriptions that can deliver them. If there are no + * subscriptions capable of delivering the remaining messages on the queue + * then processQueue should stop to prevent spinning. + * + * Since processQueue is runs in a fixed size Executor, it should not run + * indefinitely to prevent starving other tasks of CPU (e.g jobs to process + * incoming messages may not be able to be scheduled in the thread pool + * because all threads are working on clearing down large queues). To solve + * this problem, after an arbitrary number of message deliveries the + * processQueue job stops iterating, resubmits itself to the executor, and + * ends the current instance + * + * @param runner the Runner to schedule + * @throws AMQException + */ + public void processQueue(QueueRunner runner) throws AMQException { long stateChangeCount; long previousStateChangeCount = Long.MIN_VALUE; boolean deliveryIncomplete = true; - int extraLoops = 1; - long iterations = MAX_ASYNC_DELIVERIES; + boolean lastLoop = false; + int iterations = MAX_ASYNC_DELIVERIES; _asynchronousRunner.compareAndSet(runner, null); @@ -1874,12 +1791,14 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener if (previousStateChangeCount != stateChangeCount) { - extraLoops = 1; + //further asynchronous delivery is required since the + //previous loop. keep going if iteration slicing allows. + lastLoop = false; } previousStateChangeCount = stateChangeCount; - deliveryIncomplete = _subscriptionList.size() != 0; - boolean done; + boolean allSubscriptionsDone = true; + boolean subscriptionDone; SubscriptionList.SubscriptionNodeIterator subscriptionIter = _subscriptionList.iterator(); //iterate over the subscribers and try to advance their pointer @@ -1889,29 +1808,25 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener sub.getSendLock(); try { - - done = attemptDelivery(sub); - - if (done) + //attempt delivery. returns true if no further delivery currently possible to this sub + subscriptionDone = attemptDelivery(sub); + if (subscriptionDone) { - if (extraLoops == 0) + //close autoClose subscriptions if we are not currently intent on continuing + if (lastLoop && sub.isAutoClose()) { - if(getNextAvailableEntry(sub) == null) - { - sub.queueEmpty(); - } - deliveryIncomplete = false; + unregisterSubscription(sub); - } - else - { - extraLoops--; + sub.confirmAutoClose(); } } else { + //this subscription can accept additional deliveries, so we must + //keep going after this (if iteration slicing allows it) + allSubscriptionsDone = false; + lastLoop = false; iterations--; - extraLoops = 1; } } finally @@ -1919,10 +1834,34 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener sub.releaseSendLock(); } } + + if(allSubscriptionsDone && lastLoop) + { + //We have done an extra loop already and there are again + //again no further delivery attempts possible, only + //keep going if state change demands it. + deliveryIncomplete = false; + } + else if(allSubscriptionsDone) + { + //All subscriptions reported being done, but we have to do + //an extra loop if the iterations are not exhausted and + //there is still any work to be done + deliveryIncomplete = _subscriptionList.size() != 0; + lastLoop = true; + } + else + { + //some subscriptions can still accept more messages, + //keep going if iteration count allows. + lastLoop = false; + deliveryIncomplete = true; + } + _asynchronousRunner.set(null); } - // If deliveries == 0 then the limitting factor was the time-slicing rather than available messages or credit + // If iterations == 0 then the limiting factor was the time-slicing rather than available messages or credit // therefore we should schedule this runner again (unless someone beats us to it :-) ). if (iterations == 0 && _asynchronousRunner.compareAndSet(null, runner)) { @@ -1942,8 +1881,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener while (queueListIterator.advance()) { QueueEntry node = queueListIterator.getNode(); - // Only process nodes that are not currently deleted - if (!node.isDeleted()) + // Only process nodes that are not currently deleted and not dequeued + if (!node.isDispensed()) { // If the node has exired then aquire it if (node.expired() && node.acquire()) @@ -2207,22 +2146,22 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { return _dequeueSize.get(); } - + public long getByteTxnEnqueues() { return _byteTxnEnqueues.get(); } - + public long getByteTxnDequeues() { return _byteTxnDequeues.get(); } - + public long getMsgTxnEnqueues() { return _msgTxnEnqueues.get(); } - + public long getMsgTxnDequeues() { return _msgTxnDequeues.get(); @@ -2259,21 +2198,21 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { return _unackedMsgCountHigh.get(); } - + public long getUnackedMessageCount() { return _unackedMsgCount.get(); } - + public void decrementUnackedMsgCount() { _unackedMsgCount.decrementAndGet(); } - + private void incrementUnackedMsgCount() { long unackedMsgCount = _unackedMsgCount.incrementAndGet(); - + long unackedMsgCountHigh; while(unackedMsgCount > (unackedMsgCountHigh = _unackedMsgCountHigh.get())) { @@ -2283,4 +2222,9 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } } } + + public LogActor getLogActor() + { + return _logActor; + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/AuthenticationManager.java b/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/AuthenticationManager.java index 15651b088b..4c59c25d84 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/AuthenticationManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/AuthenticationManager.java @@ -20,20 +20,73 @@ */ package org.apache.qpid.server.security.auth.manager; +import javax.security.auth.Subject; import javax.security.sasl.SaslException; import javax.security.sasl.SaslServer; -import org.apache.qpid.amqp_1_0.transport.CallbackHanderSource; import org.apache.qpid.common.Closeable; +import org.apache.qpid.server.plugins.Plugin; import org.apache.qpid.server.security.auth.AuthenticationResult; -public interface AuthenticationManager extends Closeable, CallbackHanderSource +/** + * Implementations of the AuthenticationManager are responsible for determining + * the authenticity of a user's credentials. + * + * If the authentication is successful, the manager is responsible for producing a populated + * {@link Subject} containing the user's identity and zero or more principals representing + * groups to which the user belongs. + * <p> + * The {@link #initialise()} method is responsible for registering SASL mechanisms required by + * the manager. The {@link #close()} method must reverse this registration. + * + */ +public interface AuthenticationManager extends Closeable, Plugin { + /** The name for the required SASL Server mechanisms */ + public static final String PROVIDER_NAME= "AMQSASLProvider-Server"; + + /** + * Initialise the authentication plugin. + * + */ + void initialise(); + + /** + * Gets the SASL mechanisms known to this manager. + * + * @return SASL mechanism names, space separated. + */ String getMechanisms(); + /** + * Creates a SASL server for the specified mechanism name for the given + * fully qualified domain name. + * + * @param mechanism mechanism name + * @param localFQDN domain name + * + * @return SASL server + * @throws SaslException + */ SaslServer createSaslServer(String mechanism, String localFQDN) throws SaslException; + /** + * Authenticates a user using SASL negotiation. + * + * @param server SASL server + * @param response SASL response to process + * + * @return authentication result + */ AuthenticationResult authenticate(SaslServer server, byte[] response); - + /** + * Authenticates a user using their username and password. + * + * @param username username + * @param password password + * + * @return authentication result + */ + AuthenticationResult authenticate(String username, String password); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java b/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java index bbd90b4d53..1945c2e15f 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java @@ -20,27 +20,65 @@ */ package org.apache.qpid.server.security.auth.manager; -import org.apache.log4j.Logger; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.security.Security; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.TreeMap; + +import javax.security.auth.Subject; +import javax.security.auth.callback.CallbackHandler; +import javax.security.auth.login.AccountNotFoundException; +import javax.security.sasl.Sasl; +import javax.security.sasl.SaslException; +import javax.security.sasl.SaslServer; +import javax.security.sasl.SaslServerFactory; + import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.ConfigurationException; -import org.apache.qpid.server.configuration.VirtualHostConfiguration; -import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.security.auth.manager.AuthenticationManager; +import org.apache.log4j.Logger; +import org.apache.qpid.configuration.PropertyException; +import org.apache.qpid.configuration.PropertyUtils; +import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin; +import org.apache.qpid.server.configuration.plugins.ConfigurationPluginFactory; +import org.apache.qpid.server.security.auth.AuthenticationResult; +import org.apache.qpid.server.security.auth.AuthenticationResult.AuthenticationStatus; import org.apache.qpid.server.security.auth.database.PrincipalDatabase; -import org.apache.qpid.server.security.auth.sasl.JCAProvider; +import org.apache.qpid.server.security.auth.management.AMQUserManagementMBean; import org.apache.qpid.server.security.auth.sasl.AuthenticationProviderInitialiser; -import org.apache.qpid.server.security.auth.AuthenticationResult; +import org.apache.qpid.server.security.auth.sasl.JCAProvider; +import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal; -import javax.security.auth.callback.CallbackHandler; -import javax.security.sasl.SaslServerFactory; -import javax.security.sasl.SaslServer; -import javax.security.sasl.SaslException; -import javax.security.sasl.Sasl; -import java.util.Map; -import java.util.HashMap; -import java.util.TreeMap; -import java.security.Security; +/** + * Concrete implementation of the AuthenticationManager that determines if supplied + * user credentials match those appearing in a PrincipalDatabase. The implementation + * of the PrincipalDatabase is determined from the configuration. + * + * This implementation also registers the JMX UserManagemement MBean. + * + * This plugin expects configuration such as: + * + * <pre> + * <pd-auth-manager> + * <principal-database> + * <class>org.apache.qpid.server.security.auth.database.PlainPasswordFilePrincipalDatabase</class> + * <attributes> + * <attribute> + * <name>passwordFile</name> + * <value>${conf}/passwd</value> + * </attribute> + * </attributes> + * </principal-database> + * </pd-auth-manager> + * </pre> + */ public class PrincipalDatabaseAuthenticationManager implements AuthenticationManager { private static final Logger _logger = Logger.getLogger(PrincipalDatabaseAuthenticationManager.class); @@ -49,55 +87,109 @@ public class PrincipalDatabaseAuthenticationManager implements AuthenticationMan private String _mechanisms; /** Maps from the mechanism to the callback handler to use for handling those requests */ - private Map<String, CallbackHandler> _callbackHandlerMap = new HashMap<String, CallbackHandler>(); + private final Map<String, CallbackHandler> _callbackHandlerMap = new HashMap<String, CallbackHandler>(); /** * Maps from the mechanism to the properties used to initialise the server. See the method Sasl.createSaslServer for * details of the use of these properties. This map is populated during initialisation of each provider. */ - private Map<String, Map<String, ?>> _serverCreationProperties = new HashMap<String, Map<String, ?>>(); + private final Map<String, Map<String, ?>> _serverCreationProperties = new HashMap<String, Map<String, ?>>(); - private AuthenticationManager _default = null; - /** The name for the required SASL Server mechanisms */ - public static final String PROVIDER_NAME= "AMQSASLProvider-Server"; + protected PrincipalDatabase _principalDatabase = null; - public PrincipalDatabaseAuthenticationManager(String name, VirtualHostConfiguration hostConfig) throws Exception - { - _logger.info("Initialising " + (name == null ? "Default" : "'" + name + "'") - + " PrincipalDatabase authentication manager."); + protected AMQUserManagementMBean _mbean = null; - // Fixme This should be done per Vhost but allowing global hack isn't right but ... - // required as authentication is done before Vhost selection + public static final AuthenticationManagerPluginFactory<PrincipalDatabaseAuthenticationManager> FACTORY = new AuthenticationManagerPluginFactory<PrincipalDatabaseAuthenticationManager>() + { + public PrincipalDatabaseAuthenticationManager newInstance(final ConfigurationPlugin config) throws ConfigurationException + { + final PrincipalDatabaseAuthenticationManagerConfiguration configuration = config.getConfiguration(PrincipalDatabaseAuthenticationManagerConfiguration.class.getName()); - Map<String, Class<? extends SaslServerFactory>> providerMap = new TreeMap<String, Class<? extends SaslServerFactory>>(); + // If there is no configuration for this plugin then don't load it. + if (configuration == null) + { + _logger.info("No authentication-manager configuration found for PrincipalDatabaseAuthenticationManager"); + return null; + } + final PrincipalDatabaseAuthenticationManager pdam = new PrincipalDatabaseAuthenticationManager(); + pdam.configure(configuration); + pdam.initialise(); + return pdam; + } - if (name == null || hostConfig == null) + public Class<PrincipalDatabaseAuthenticationManager> getPluginClass() { - initialiseAuthenticationMechanisms(providerMap, ApplicationRegistry.getInstance().getDatabaseManager().getDatabases()); + return PrincipalDatabaseAuthenticationManager.class; } - else + + public String getPluginName() { - String databaseName = hostConfig.getAuthenticationDatabase(); + return PrincipalDatabaseAuthenticationManager.class.getName(); + } + }; - if (databaseName == null) + public static class PrincipalDatabaseAuthenticationManagerConfiguration extends ConfigurationPlugin { + + public static final ConfigurationPluginFactory FACTORY = new ConfigurationPluginFactory() + { + public List<String> getParentPaths() { - - _default = ApplicationRegistry.getInstance().getAuthenticationManager(); - return; + return Arrays.asList("security.pd-auth-manager"); } - else + + public ConfigurationPlugin newInstance(final String path, final Configuration config) throws ConfigurationException { - PrincipalDatabase database = ApplicationRegistry.getInstance().getDatabaseManager().getDatabases().get(databaseName); + final ConfigurationPlugin instance = new PrincipalDatabaseAuthenticationManagerConfiguration(); + + instance.setConfiguration(path, config); + return instance; + } + }; - if (database == null) - { - throw new ConfigurationException("Requested database:" + databaseName + " was not found"); - } + public String[] getElementsProcessed() + { + return new String[] {"principal-database.class", + "principal-database.attributes.attribute.name", + "principal-database.attributes.attribute.value"}; + } - initialiseAuthenticationMechanisms(providerMap, database); + public void validateConfiguration() throws ConfigurationException + { + } + + public String getPrincipalDatabaseClass() + { + return _configuration.getString("principal-database.class"); + } + + public Map<String,String> getPdClassAttributeMap() throws ConfigurationException + { + final List<String> argumentNames = _configuration.getList("principal-database.attributes.attribute.name"); + final List<String> argumentValues = _configuration.getList("principal-database.attributes.attribute.value"); + final Map<String,String> attributes = new HashMap<String,String>(argumentNames.size()); + + for (int i = 0; i < argumentNames.size(); i++) + { + final String argName = argumentNames.get(i); + final String argValue = argumentValues.get(i); + + attributes.put(argName, argValue); } + + return Collections.unmodifiableMap(attributes); } + } + + protected PrincipalDatabaseAuthenticationManager() + { + } + + public void initialise() + { + final Map<String, Class<? extends SaslServerFactory>> providerMap = new TreeMap<String, Class<? extends SaslServerFactory>>(); + + initialiseAuthenticationMechanisms(providerMap, _principalDatabase); if (providerMap.size() > 0) { @@ -110,33 +202,16 @@ public class PrincipalDatabaseAuthenticationManager implements AuthenticationMan { _logger.info("Additional SASL providers successfully registered."); } - } else { _logger.warn("No additional SASL providers registered."); } + registerManagement(); } - - private void initialiseAuthenticationMechanisms(Map<String, Class<? extends SaslServerFactory>> providerMap, Map<String, PrincipalDatabase> databases) throws Exception - { - if (databases.size() > 1) - { - _logger.warn("More than one principle database provided currently authentication mechanism will override each other."); - } - - for (Map.Entry<String, PrincipalDatabase> entry : databases.entrySet()) - { - // fixme As the database now provide the mechanisms they support, they will ... - // overwrite each other in the map. There should only be one database per vhost. - // But currently we must have authentication before vhost definition. - initialiseAuthenticationMechanisms(providerMap, entry.getValue()); - } - } - - private void initialiseAuthenticationMechanisms(Map<String, Class<? extends SaslServerFactory>> providerMap, PrincipalDatabase database) throws Exception + private void initialiseAuthenticationMechanisms(Map<String, Class<? extends SaslServerFactory>> providerMap, PrincipalDatabase database) { if (database == null || database.getMechanisms().size() == 0) { @@ -152,7 +227,6 @@ public class PrincipalDatabaseAuthenticationManager implements AuthenticationMan private void initialiseAuthenticationMechanism(String mechanism, AuthenticationProviderInitialiser initialiser, Map<String, Class<? extends SaslServerFactory>> providerMap) - throws Exception { if (_mechanisms == null) { @@ -173,43 +247,37 @@ public class PrincipalDatabaseAuthenticationManager implements AuthenticationMan _logger.info("Initialised " + mechanism + " SASL provider successfully"); } + /** + * @see org.apache.qpid.server.plugins.Plugin#configure(org.apache.qpid.server.configuration.plugins.ConfigurationPlugin) + */ + public void configure(final ConfigurationPlugin config) throws ConfigurationException + { + final PrincipalDatabaseAuthenticationManagerConfiguration pdamConfig = (PrincipalDatabaseAuthenticationManagerConfiguration) config; + final String pdClazz = pdamConfig.getPrincipalDatabaseClass(); + + _logger.info("PrincipalDatabase concrete implementation : " + pdClazz); + + _principalDatabase = createPrincipalDatabaseImpl(pdClazz); + + configPrincipalDatabase(_principalDatabase, pdamConfig); + } + public String getMechanisms() { - if (_default != null) - { - // Use the default AuthenticationManager if present - return _default.getMechanisms(); - } - else - { - return _mechanisms; - } + return _mechanisms; } public SaslServer createSaslServer(String mechanism, String localFQDN) throws SaslException { - if (_default != null) - { - // Use the default AuthenticationManager if present - return _default.createSaslServer(mechanism, localFQDN); - } - else - { - return Sasl.createSaslServer(mechanism, "AMQP", localFQDN, _serverCreationProperties.get(mechanism), - _callbackHandlerMap.get(mechanism)); - } - + return Sasl.createSaslServer(mechanism, "AMQP", localFQDN, _serverCreationProperties.get(mechanism), + _callbackHandlerMap.get(mechanism)); } + /** + * @see org.apache.qpid.server.security.auth.manager.AuthenticationManager#authenticate(SaslServer, byte[]) + */ public AuthenticationResult authenticate(SaslServer server, byte[] response) { - // Use the default AuthenticationManager if present - if (_default != null) - { - return _default.authenticate(server, response); - } - - try { // Process response from the client @@ -217,7 +285,9 @@ public class PrincipalDatabaseAuthenticationManager implements AuthenticationMan if (server.isComplete()) { - return new AuthenticationResult(challenge, AuthenticationResult.AuthenticationStatus.SUCCESS); + final Subject subject = new Subject(); + subject.getPrincipals().add(new UsernamePrincipal(server.getAuthorizationID())); + return new AuthenticationResult(subject); } else { @@ -230,13 +300,164 @@ public class PrincipalDatabaseAuthenticationManager implements AuthenticationMan } } + /** + * @see org.apache.qpid.server.security.auth.manager.AuthenticationManager#authenticate(String, String) + */ + public AuthenticationResult authenticate(final String username, final String password) + { + try + { + if (_principalDatabase.verifyPassword(username, password.toCharArray())) + { + final Subject subject = new Subject(); + subject.getPrincipals().add(new UsernamePrincipal(username)); + return new AuthenticationResult(subject); + } + else + { + return new AuthenticationResult(AuthenticationStatus.CONTINUE); + } + } + catch (AccountNotFoundException e) + { + return new AuthenticationResult(AuthenticationStatus.CONTINUE); + } + } + public void close() { + _mechanisms = null; Security.removeProvider(PROVIDER_NAME); + + unregisterManagement(); + } + + private PrincipalDatabase createPrincipalDatabaseImpl(final String pdClazz) throws ConfigurationException + { + try + { + return (PrincipalDatabase) Class.forName(pdClazz).newInstance(); + } + catch (InstantiationException ie) + { + throw new ConfigurationException("Cannot instantiate " + pdClazz, ie); + } + catch (IllegalAccessException iae) + { + throw new ConfigurationException("Cannot access " + pdClazz, iae); + } + catch (ClassNotFoundException cnfe) + { + throw new ConfigurationException("Cannot load " + pdClazz + " implementation", cnfe); + } + catch (ClassCastException cce) + { + throw new ConfigurationException("Expecting a " + PrincipalDatabase.class + " implementation", cce); + } } - public CallbackHandler getHandler(String mechanism) + private void configPrincipalDatabase(final PrincipalDatabase principalDatabase, final PrincipalDatabaseAuthenticationManagerConfiguration config) + throws ConfigurationException { - return _callbackHandlerMap.get(mechanism); + + final Map<String,String> attributes = config.getPdClassAttributeMap(); + + for (Iterator<Entry<String, String>> iterator = attributes.entrySet().iterator(); iterator.hasNext();) + { + final Entry<String, String> nameValuePair = iterator.next(); + final String methodName = generateSetterName(nameValuePair.getKey()); + final Method method; + try + { + method = principalDatabase.getClass().getMethod(methodName, String.class); + } + catch (Exception e) + { + throw new ConfigurationException("No method " + methodName + " found in class " + + principalDatabase.getClass() + + " hence unable to configure principal database. The method must be public and " + + "have a single String argument with a void return type", e); + } + try + { + method.invoke(principalDatabase, PropertyUtils.replaceProperties(nameValuePair.getValue())); + } + catch (IllegalArgumentException e) + { + throw new ConfigurationException(e.getMessage(), e); + } + catch (PropertyException e) + { + throw new ConfigurationException(e.getMessage(), e); + } + catch (IllegalAccessException e) + { + throw new ConfigurationException(e.getMessage(), e); + } + catch (InvocationTargetException e) + { + // QPID-1347.. InvocationTargetException wraps the checked exception thrown from the reflective + // method call. Pull out the underlying message and cause to make these more apparent to the user. + throw new ConfigurationException(e.getCause().getMessage(), e.getCause()); + } + } + } + + private String generateSetterName(String argName) throws ConfigurationException + { + if ((argName == null) || (argName.length() == 0)) + { + throw new ConfigurationException("Argument names must have length >= 1 character"); + } + + if (Character.isLowerCase(argName.charAt(0))) + { + argName = Character.toUpperCase(argName.charAt(0)) + argName.substring(1); + } + + final String methodName = "set" + argName; + return methodName; + } + + protected void setPrincipalDatabase(final PrincipalDatabase principalDatabase) + { + _principalDatabase = principalDatabase; + } + + protected void registerManagement() + { + try + { + _logger.info("Registering UserManagementMBean"); + + _mbean = new AMQUserManagementMBean(); + _mbean.setPrincipalDatabase(_principalDatabase); + _mbean.register(); + } + catch (Exception e) + { + _logger.warn("User management disabled as unable to create MBean:", e); + _mbean = null; + } + } + + protected void unregisterManagement() + { + try + { + if (_mbean != null) + { + _logger.info("Unregistering UserManagementMBean"); + _mbean.unregister(); + } + } + catch (Exception e) + { + _logger.warn("Failed to unregister User management MBean:", e); + } + finally + { + _mbean = null; + } } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainSaslServerFactory.java b/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainSaslServerFactory.java index eb463ee722..17d123eb0d 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainSaslServerFactory.java +++ b/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainSaslServerFactory.java @@ -45,9 +45,10 @@ public class AmqPlainSaslServerFactory implements SaslServerFactory public String[] getMechanismNames(Map props) { - if (props != null && (props.containsKey(Sasl.POLICY_NOPLAINTEXT) || - props.containsKey(Sasl.POLICY_NODICTIONARY) || - props.containsKey(Sasl.POLICY_NOACTIVE))) + if (props != null && + (props.containsKey(Sasl.POLICY_NOPLAINTEXT) || + props.containsKey(Sasl.POLICY_NODICTIONARY) || + props.containsKey(Sasl.POLICY_NOACTIVE))) { // returned array must be non null according to interface documentation return new String[0]; diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServerFactory.java b/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServerFactory.java index 5706cbf49e..8a5ff7df2d 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServerFactory.java +++ b/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServerFactory.java @@ -47,10 +47,11 @@ public class AnonymousSaslServerFactory implements SaslServerFactory public String[] getMechanismNames(Map props) { - if (props != null && (props.containsKey(Sasl.POLICY_NOPLAINTEXT) || - props.containsKey(Sasl.POLICY_NODICTIONARY) || - props.containsKey(Sasl.POLICY_NOACTIVE) || - props.containsKey(Sasl.POLICY_NOANONYMOUS))) + if (props != null && + (props.containsKey(Sasl.POLICY_NOPLAINTEXT) || + props.containsKey(Sasl.POLICY_NODICTIONARY) || + props.containsKey(Sasl.POLICY_NOACTIVE) || + props.containsKey(Sasl.POLICY_NOANONYMOUS))) { // returned array must be non null according to interface documentation return new String[0]; diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/plain/PlainSaslServerFactory.java b/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/plain/PlainSaslServerFactory.java index 11b0f26e05..3144bfbce6 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/plain/PlainSaslServerFactory.java +++ b/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/plain/PlainSaslServerFactory.java @@ -45,9 +45,10 @@ public class PlainSaslServerFactory implements SaslServerFactory public String[] getMechanismNames(Map props) { - if (props != null && (props.containsKey(Sasl.POLICY_NOPLAINTEXT) || - props.containsKey(Sasl.POLICY_NODICTIONARY) || - props.containsKey(Sasl.POLICY_NOACTIVE))) + if (props != null && + (props.containsKey(Sasl.POLICY_NOPLAINTEXT) || + props.containsKey(Sasl.POLICY_NODICTIONARY) || + props.containsKey(Sasl.POLICY_NOACTIVE))) { // returned array must be non null according to interface documentation return new String[0]; diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java index c4e2f1a322..65790e2e6f 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java @@ -23,9 +23,25 @@ package org.apache.qpid.server.transport; import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CHANNEL_FORMAT; import static org.apache.qpid.util.Serial.gt; -import com.sun.security.auth.UserPrincipal; +import java.lang.ref.WeakReference; +import java.security.Principal; +import java.text.MessageFormat; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.SortedMap; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicLong; + +import javax.security.auth.Subject; import org.apache.qpid.AMQException; +import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.ProtocolEngine; import org.apache.qpid.server.configuration.ConfigStore; import org.apache.qpid.server.configuration.ConfiguredObject; @@ -38,18 +54,18 @@ import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.actors.GenericActor; import org.apache.qpid.server.logging.messages.ChannelMessages; import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.protocol.AMQConnectionModel; +import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.queue.QueueEntry; -import org.apache.qpid.server.security.PrincipalHolder; +import org.apache.qpid.server.security.AuthorizationHolder; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.subscription.Subscription_0_10; import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.txn.LocalTransaction; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.server.protocol.AMQSessionModel; -import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.transport.Binary; import org.apache.qpid.transport.Connection; import org.apache.qpid.transport.MessageTransfer; @@ -58,24 +74,13 @@ import org.apache.qpid.transport.Range; import org.apache.qpid.transport.RangeSet; import org.apache.qpid.transport.Session; import org.apache.qpid.transport.SessionDelegate; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import java.lang.ref.WeakReference; -import java.security.Principal; -import java.text.MessageFormat; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.SortedMap; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentSkipListMap; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.atomic.AtomicLong; - -public class ServerSession extends Session implements PrincipalHolder, SessionConfig, AMQSessionModel, LogSubject +public class ServerSession extends Session implements AuthorizationHolder, SessionConfig, AMQSessionModel, LogSubject { + private static final Logger _logger = LoggerFactory.getLogger(ServerSession.class); + private static final String NULL_DESTINTATION = UUID.randomUUID().toString(); private final UUID _id; @@ -106,13 +111,12 @@ public class ServerSession extends Session implements PrincipalHolder, SessionCo new ConcurrentSkipListMap<Integer, MessageDispositionChangeListener>(); private ServerTransaction _transaction; - + private final AtomicLong _txnStarts = new AtomicLong(0); private final AtomicLong _txnCommits = new AtomicLong(0); private final AtomicLong _txnRejects = new AtomicLong(0); private final AtomicLong _txnCount = new AtomicLong(0); - - private Principal _principal; + private final AtomicLong _txnUpdateTime = new AtomicLong(0); private Map<String, Subscription_0_10> _subscriptions = new ConcurrentHashMap<String, Subscription_0_10>(); @@ -125,27 +129,27 @@ public class ServerSession extends Session implements PrincipalHolder, SessionCo this(connection, delegate, name, expiry, ((ServerConnection)connection).getConfig()); } + public ServerSession(Connection connection, SessionDelegate delegate, Binary name, long expiry, ConnectionConfig connConfig) + { + super(connection, delegate, name, expiry); + _connectionConfig = connConfig; + _transaction = new AutoCommitTransaction(this.getMessageStore()); + + _reference = new WeakReference<Session>(this); + _id = getConfigStore().createId(); + getConfigStore().addConfiguredObject(this); + } + protected void setState(State state) { super.setState(state); if (state == State.OPEN) { - _actor.message(ChannelMessages.CREATE()); + _actor.message(ChannelMessages.CREATE()); } } - public ServerSession(Connection connection, SessionDelegate delegate, Binary name, long expiry, ConnectionConfig connConfig) - { - super(connection, delegate, name, expiry); - _connectionConfig = connConfig; - _transaction = new AutoCommitTransaction(this.getMessageStore()); - _principal = new UserPrincipal(connection.getAuthorizationID()); - _reference = new WeakReference(this); - _id = getConfigStore().createId(); - getConfigStore().addConfiguredObject(this); - } - private ConfigStore getConfigStore() { return getConnectionConfig().getConfigStore(); @@ -160,8 +164,8 @@ public class ServerSession extends Session implements PrincipalHolder, SessionCo public void enqueue(final ServerMessage message, final ArrayList<? extends BaseQueue> queues) { - - _transaction.enqueue(queues,message, new ServerTransaction.Action() + getConnectionModel().registerMessageReceived(message.getSize(), message.getArrivalTime()); + _transaction.enqueue(queues,message, new ServerTransaction.Action() { BaseQueue[] _queues = queues.toArray(new BaseQueue[queues.size()]); @@ -189,6 +193,7 @@ public class ServerSession extends Session implements PrincipalHolder, SessionCo }); incrementOutstandingTxnsIfNecessary(); + updateTransactionalActivity(); } @@ -196,6 +201,7 @@ public class ServerSession extends Session implements PrincipalHolder, SessionCo Runnable postIdSettingAction) { invoke(xfr, postIdSettingAction); + getConnectionModel().registerMessageDelivered(xfr.getBodySize()); } public void onMessageDispositionChange(MessageTransfer xfr, MessageDispositionChangeListener acceptListener) @@ -331,7 +337,7 @@ public class ServerSession extends Session implements PrincipalHolder, SessionCo } } - public void removeDispositionListener(Method method) + public void removeDispositionListener(Method method) { _messageDispositionListenerMap.remove(method.getId()); } @@ -351,7 +357,7 @@ public class ServerSession extends Session implements PrincipalHolder, SessionCo { task.doTask(this); } - + CurrentActor.get().message(getLogSubject(), ChannelMessages.CLOSE()); } @@ -377,6 +383,7 @@ public class ServerSession extends Session implements PrincipalHolder, SessionCo entry.release(); } }); + updateTransactionalActivity(); } public Collection<Subscription_0_10> getSubscriptions() @@ -396,7 +403,7 @@ public class ServerSession extends Session implements PrincipalHolder, SessionCo public void unregister(Subscription_0_10 sub) { - _subscriptions.remove(sub.getName()); + _subscriptions.remove(sub.getConsumerTag().toString()); try { sub.getSendLock(); @@ -410,14 +417,14 @@ public class ServerSession extends Session implements PrincipalHolder, SessionCo catch (AMQException e) { // TODO - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + _logger.error("Failed to unregister subscription", e); } finally { sub.releaseSendLock(); } } - + public boolean isTransactional() { // this does not look great but there should only be one "non-transactional" @@ -425,6 +432,11 @@ public class ServerSession extends Session implements PrincipalHolder, SessionCo // theory return !(_transaction instanceof AutoCommitTransaction); } + + public boolean inTransaction() + { + return isTransactional() && _txnUpdateTime.get() > 0 && _transaction.getTransactionStartTime() > 0; + } public void selectTx() { @@ -435,7 +447,7 @@ public class ServerSession extends Session implements PrincipalHolder, SessionCo public void commit() { _transaction.commit(); - + _txnCommits.incrementAndGet(); _txnStarts.incrementAndGet(); decrementOutstandingTxnsIfNecessary(); @@ -444,13 +456,13 @@ public class ServerSession extends Session implements PrincipalHolder, SessionCo public void rollback() { _transaction.rollback(); - + _txnRejects.incrementAndGet(); _txnStarts.incrementAndGet(); decrementOutstandingTxnsIfNecessary(); } - + private void incrementOutstandingTxnsIfNecessary() { if(isTransactional()) @@ -460,7 +472,7 @@ public class ServerSession extends Session implements PrincipalHolder, SessionCo _txnCount.compareAndSet(0,1); } } - + private void decrementOutstandingTxnsIfNecessary() { if(isTransactional()) @@ -471,6 +483,17 @@ public class ServerSession extends Session implements PrincipalHolder, SessionCo } } + /** + * Update last transaction activity timestamp + */ + public void updateTransactionalActivity() + { + if (isTransactional()) + { + _txnUpdateTime.set(System.currentTimeMillis()); + } + } + public Long getTxnStarts() { return _txnStarts.get(); @@ -490,10 +513,15 @@ public class ServerSession extends Session implements PrincipalHolder, SessionCo { return _txnCount.get(); } - - public Principal getPrincipal() + + public Principal getAuthorizedPrincipal() + { + return ((ServerConnection) getConnection()).getAuthorizedPrincipal(); + } + + public Subject getAuthorizedSubject() { - return _principal; + return ((ServerConnection) getConnection()).getAuthorizedSubject(); } public void addSessionCloseTask(Task task) @@ -606,17 +634,47 @@ public class ServerSession extends Session implements PrincipalHolder, SessionCo return (LogSubject) this; } + public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose) throws AMQException + { + if (inTransaction()) + { + long currentTime = System.currentTimeMillis(); + long openTime = currentTime - _transaction.getTransactionStartTime(); + long idleTime = currentTime - _txnUpdateTime.get(); + + // Log a warning on idle or open transactions + if (idleWarn > 0L && idleTime > idleWarn) + { + CurrentActor.get().message(getLogSubject(), ChannelMessages.IDLE_TXN(openTime)); + _logger.warn("IDLE TRANSACTION ALERT " + getLogSubject().toString() + " " + idleTime + " ms"); + } + else if (openWarn > 0L && openTime > openWarn) + { + CurrentActor.get().message(getLogSubject(), ChannelMessages.OPEN_TXN(openTime)); + _logger.warn("OPEN TRANSACTION ALERT " + getLogSubject().toString() + " " + openTime + " ms"); + } + + // Close connection for idle or open transactions that have timed out + if (idleClose > 0L && idleTime > idleClose) + { + getConnectionModel().closeSession(this, AMQConstant.RESOURCE_ERROR, "Idle transaction timed out"); + } + else if (openClose > 0L && openTime > openClose) + { + getConnectionModel().closeSession(this, AMQConstant.RESOURCE_ERROR, "Open transaction timed out"); + } + } + } + public String toLogString() { return "[" + MessageFormat.format(CHANNEL_FORMAT, - getConnection().getConnectionId(), + ((ServerConnection) getConnection()).getConnectionId(), getClientID(), ((ProtocolEngine) _connectionConfig).getRemoteAddress().toString(), getVirtualHost().getName(), getChannel()) + "] "; - } - } diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java index 49678055f9..5f3446236c 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java @@ -20,12 +20,16 @@ */ package org.apache.qpid.server.virtualhost; -import java.util.*; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import javax.management.NotCompliantMBeanException; +import javax.management.ObjectName; import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.ConfigurationException; @@ -57,7 +61,8 @@ import org.apache.qpid.server.logging.messages.VirtualHostMessages; import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject; import org.apache.qpid.server.management.AMQManagedObject; import org.apache.qpid.server.management.ManagedObject; -import org.apache.qpid.server.protocol.v1_0.LinkRegistry; +import org.apache.qpid.server.protocol.AMQConnectionModel; +import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.queue.DefaultQueueRegistry; @@ -66,7 +71,7 @@ import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.registry.IApplicationRegistry; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.security.auth.manager.AuthenticationManager; -import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager; +import org.apache.qpid.server.stats.StatisticsCounter; import org.apache.qpid.server.store.ConfigurationRecoveryHandler; import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.MessageStore; @@ -94,7 +99,7 @@ public class VirtualHostImpl implements VirtualHost private AMQBrokerManagerMBean _brokerMBean; - private AuthenticationManager _authenticationManager; + private final AuthenticationManager _authenticationManager; private SecurityManager _securityManager; @@ -106,11 +111,12 @@ public class VirtualHostImpl implements VirtualHost private BrokerConfig _broker; private UUID _id; + private boolean _statisticsEnabled = false; + private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived; private final long _createTime = System.currentTimeMillis(); private final ConcurrentHashMap<BrokerLink,BrokerLink> _links = new ConcurrentHashMap<BrokerLink, BrokerLink>(); private static final int HOUSEKEEPING_SHUTDOWN_TIMEOUT = 5; - private final Map<String, LinkRegistry> _linkRegistry = new HashMap<String, LinkRegistry>(); public IConnectionRegistry getConnectionRegistry() { @@ -157,12 +163,12 @@ public class VirtualHostImpl implements VirtualHost public String getObjectInstanceName() { - return _name.toString(); + return ObjectName.quote(_name); } public String getName() { - return _name.toString(); + return _name; } public VirtualHostImpl getVirtualHost() @@ -171,22 +177,11 @@ public class VirtualHostImpl implements VirtualHost } } - public VirtualHostImpl(IApplicationRegistry appRegistry, VirtualHostConfiguration hostConfig) throws Exception - { - this(appRegistry, hostConfig, null); - } - - - public VirtualHostImpl(VirtualHostConfiguration hostConfig, MessageStore store) throws Exception - { - this(ApplicationRegistry.getInstance(),hostConfig,store); - } - - private VirtualHostImpl(IApplicationRegistry appRegistry, VirtualHostConfiguration hostConfig, MessageStore store) throws Exception + public VirtualHostImpl(IApplicationRegistry appRegistry, VirtualHostConfiguration hostConfig, MessageStore store) throws Exception { if (hostConfig == null) { - throw new IllegalAccessException("HostConfig and MessageStore cannot be null"); + throw new IllegalArgumentException("HostConfig cannot be null"); } _appRegistry = appRegistry; @@ -240,11 +235,13 @@ public class VirtualHostImpl implements VirtualHost initialiseMessageStore(hostConfig); } - _authenticationManager = new PrincipalDatabaseAuthenticationManager(_name, _configuration); + _authenticationManager = ApplicationRegistry.getInstance().getAuthenticationManager(); _brokerMBean = new AMQBrokerManagerMBean(_virtualHostMBean); _brokerMBean.register(); initialiseHouseKeeping(hostConfig.getHousekeepingExpiredMessageCheckPeriod()); + + initialiseStatistics(); } private void initialiseHouseKeeping(long period) @@ -277,19 +274,30 @@ public class VirtualHostImpl implements VirtualHost // house keeping task from running. } } + for (AMQConnectionModel connection : getConnectionRegistry().getConnections()) + { + _logger.debug("Checking for long running open transactions on connection " + connection); + for (AMQSessionModel session : connection.getSessionModels()) + { + _logger.debug("Checking for long running open transactions on session " + session); + try + { + session.checkTransactionStatus(_configuration.getTransactionTimeoutOpenWarn(), + _configuration.getTransactionTimeoutOpenClose(), + _configuration.getTransactionTimeoutIdleWarn(), + _configuration.getTransactionTimeoutIdleClose()); + } + catch (Exception e) + { + _logger.error("Exception in housekeeping for connection: " + connection.toString(), e); + } + } + } } } scheduleHouseKeepingTask(period, new ExpiredMessagesTask(this)); - class ForceChannelClosuresTask extends TimerTask - { - public void run() - { - _connectionRegistry.expireClosedChannels(); - } - } - Map<String, VirtualHostPluginFactory> plugins = ApplicationRegistry.getInstance().getPluginManager().getVirtualHostPlugins(); @@ -442,46 +450,57 @@ public class VirtualHostImpl implements VirtualHost private void configureQueue(QueueConfiguration queueConfiguration) throws AMQException, ConfigurationException { AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(queueConfiguration, this); + String queueName = queue.getName(); if (queue.isDurable()) { getDurableConfigurationStore().createQueue(queue); } + //get the exchange name (returns default exchange name if none was specified) String exchangeName = queueConfiguration.getExchange(); - Exchange exchange = _exchangeRegistry.getExchange(exchangeName == null ? null : new AMQShortString(exchangeName)); - - if (exchange == null) - { - exchange = _exchangeRegistry.getDefaultExchange(); - } - + Exchange exchange = _exchangeRegistry.getExchange(exchangeName); if (exchange == null) { - throw new ConfigurationException("Attempt to bind queue to unknown exchange:" + exchangeName); + throw new ConfigurationException("Attempt to bind queue '" + queueName + "' to unknown exchange:" + exchangeName); } - List routingKeys = queueConfiguration.getRoutingKeys(); - if (routingKeys == null || routingKeys.isEmpty()) - { - routingKeys = Collections.singletonList(queue.getNameShortString()); - } + Exchange defaultExchange = _exchangeRegistry.getDefaultExchange(); + + //get routing keys in configuration (returns empty list if none are defined) + List<?> routingKeys = queueConfiguration.getRoutingKeys(); for (Object routingKeyNameObj : routingKeys) { - AMQShortString routingKey = new AMQShortString(String.valueOf(routingKeyNameObj)); - if (_logger.isInfoEnabled()) + String routingKey = String.valueOf(routingKeyNameObj); + + if (exchange.equals(defaultExchange) && !queueName.equals(routingKey)) { - _logger.info("Binding queue:" + queue + " with routing key '" + routingKey + "' to exchange:" + this); + throw new ConfigurationException("Illegal attempt to bind queue '" + queueName + + "' to the default exchange with a key other than the queue name: " + routingKey); } - _bindingFactory.addBinding(routingKey.toString(), queue, exchange, null); + + configureBinding(queue, exchange, routingKey); + } + + if (!exchange.equals(defaultExchange)) + { + //bind the queue to the named exchange using its name + configureBinding(queue, exchange, queueName); } - if (exchange != _exchangeRegistry.getDefaultExchange()) + //ensure the queue is bound to the default exchange using its name + configureBinding(queue, defaultExchange, queueName); + } + + private void configureBinding(AMQQueue queue, Exchange exchange, String routingKey) throws AMQException + { + if (_logger.isInfoEnabled()) { - _bindingFactory.addBinding(queue.getNameShortString().toString(), queue, exchange, null); + _logger.info("Binding queue:" + queue + " with routing key '" + routingKey + "' to exchange:" + exchange.getName()); } + _bindingFactory.addBinding(routingKey, queue, exchange, null); } public String getName() @@ -623,6 +642,80 @@ public class VirtualHostImpl implements VirtualHost { return _bindingFactory; } + + public void registerMessageDelivered(long messageSize) + { + if (isStatisticsEnabled()) + { + _messagesDelivered.registerEvent(1L); + _dataDelivered.registerEvent(messageSize); + } + _appRegistry.registerMessageDelivered(messageSize); + } + + public void registerMessageReceived(long messageSize, long timestamp) + { + if (isStatisticsEnabled()) + { + _messagesReceived.registerEvent(1L, timestamp); + _dataReceived.registerEvent(messageSize, timestamp); + } + _appRegistry.registerMessageReceived(messageSize, timestamp); + } + + public StatisticsCounter getMessageReceiptStatistics() + { + return _messagesReceived; + } + + public StatisticsCounter getDataReceiptStatistics() + { + return _dataReceived; + } + + public StatisticsCounter getMessageDeliveryStatistics() + { + return _messagesDelivered; + } + + public StatisticsCounter getDataDeliveryStatistics() + { + return _dataDelivered; + } + + public void resetStatistics() + { + _messagesDelivered.reset(); + _dataDelivered.reset(); + _messagesReceived.reset(); + _dataReceived.reset(); + + for (AMQConnectionModel connection : _connectionRegistry.getConnections()) + { + connection.resetStatistics(); + } + } + + public void initialiseStatistics() + { + setStatisticsEnabled(!StatisticsCounter.DISABLE_STATISTICS && + _appRegistry.getConfiguration().isStatisticsGenerationVirtualhostsEnabled()); + + _messagesDelivered = new StatisticsCounter("messages-delivered-" + getName()); + _dataDelivered = new StatisticsCounter("bytes-delivered-" + getName()); + _messagesReceived = new StatisticsCounter("messages-received-" + getName()); + _dataReceived = new StatisticsCounter("bytes-received-" + getName()); + } + + public boolean isStatisticsEnabled() + { + return _statisticsEnabled; + } + + public void setStatisticsEnabled(boolean enabled) + { + _statisticsEnabled = enabled; + } public void createBrokerConnection(final String transport, final String host, @@ -659,17 +752,6 @@ public class VirtualHostImpl implements VirtualHost } } - public synchronized LinkRegistry getLinkRegistry(String remoteContainerId) - { - LinkRegistry linkRegistry = _linkRegistry.get(remoteContainerId); - if(linkRegistry == null) - { - linkRegistry = new LinkRegistry(); - _linkRegistry.put(remoteContainerId, linkRegistry); - } - return linkRegistry; - } - public ConfigStore getConfigStore() { return getApplicationRegistry().getConfigStore(); diff --git a/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java index 8755724cfc..6db1560fb7 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java +++ b/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java @@ -100,14 +100,14 @@ public class AbstractHeadersExchangeTestBase extends InternalBrokerBaseCase return bind(queueName, queueName, getHeadersMap(bindings)); } - + protected void unbind(TestQueue queue, String... bindings) throws AMQException { String queueName = queue.getName(); //TODO - check this exchange.onUnbind(new Binding(null,queueName, queue, exchange, getHeadersMap(bindings))); } - + protected int getCount() { return count; @@ -120,7 +120,7 @@ public class AbstractHeadersExchangeTestBase extends InternalBrokerBaseCase exchange.onBind(new Binding(null,key, queue, exchange, args)); return queue; } - + protected int route(Message m) throws AMQException { @@ -175,14 +175,14 @@ public class AbstractHeadersExchangeTestBase extends InternalBrokerBaseCase } } - + static Map<String,Object> getHeadersMap(String... entries) { if(entries == null) { return null; } - + Map<String,Object> headers = new HashMap<String,Object>(); for (String s : entries) @@ -276,7 +276,7 @@ public class AbstractHeadersExchangeTestBase extends InternalBrokerBaseCase static ContentHeaderBody getContentHeader(FieldTable headers) { ContentHeaderBody header = new ContentHeaderBody(); - header.properties = getProperties(headers); + header.setProperties(getProperties(headers)); return header; } @@ -428,21 +428,11 @@ public class AbstractHeadersExchangeTestBase extends InternalBrokerBaseCase //To change body of implemented methods use File | Settings | File Templates. } - public void reject(Subscription subscription) - { - //To change body of implemented methods use File | Settings | File Templates. - } - - public boolean isRejectedBy(Subscription subscription) + public boolean isRejectedBy(long subscriptionId) { return false; //To change body of implemented methods use File | Settings | File Templates. } - public void requeue(Subscription subscription) - { - //To change body of implemented methods use File | Settings | File Templates. - } - public void dequeue() { //To change body of implemented methods use File | Settings | File Templates. @@ -482,6 +472,16 @@ public class AbstractHeadersExchangeTestBase extends InternalBrokerBaseCase { return 0; //To change body of implemented methods use File | Settings | File Templates. } + + public boolean isDequeued() + { + return false; + } + + public boolean isDispensed() + { + return false; + } }; if(action != null) diff --git a/java/broker/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java b/java/broker/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java index 790511017a..422105e410 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java +++ b/java/broker/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java @@ -31,10 +31,10 @@ import org.apache.qpid.server.message.ServerMessage; /** * Mock Server Message allowing its persistent flag to be controlled from test. */ -class MockServerMessage implements ServerMessage<MockServerMessage> +class MockServerMessage implements ServerMessage { /** - * + * */ private final boolean persistent; @@ -46,67 +46,56 @@ class MockServerMessage implements ServerMessage<MockServerMessage> this.persistent = persistent; } - public boolean isPersistent() { return persistent; } - - public MessageReference<MockServerMessage> newReference() + public MessageReference newReference() { throw new NotImplementedException(); } - public boolean isImmediate() { throw new NotImplementedException(); } - public long getSize() { throw new NotImplementedException(); } - public SessionConfig getSessionConfig() { throw new NotImplementedException(); } - public String getRoutingKey() { throw new NotImplementedException(); } - public AMQMessageHeader getMessageHeader() { throw new NotImplementedException(); } - public long getExpiration() { throw new NotImplementedException(); } - public int getContent(ByteBuffer buf, int offset) { throw new NotImplementedException(); } - public long getArrivalTime() { throw new NotImplementedException(); } - public Long getMessageNumber() { return 0L; |