diff options
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache')
91 files changed, 2277 insertions, 3889 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java index f0f7678cd9..a6f319cb1f 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java @@ -1096,11 +1096,6 @@ public class QMFService implements ConfigStore.ConfigEventListener, Closeable return Boolean.FALSE; } - public Long getFlowStoppedCount() - { - return 0L; - } - public BrokerSchema.QueueClass.PurgeMethodResponseCommand purge(final BrokerSchema.QueueClass.PurgeMethodResponseCommandFactory factory, final Long request) { @@ -1311,12 +1306,6 @@ public class QMFService implements ConfigStore.ConfigEventListener, Closeable { return _obj.isShadow(); } - - public Boolean getUserProxyAuth() - { - // TODO - return false; - } } private class SessionDelegate implements BrokerSchema.SessionDelegate diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java index d1ea5dba69..a612f280d6 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java @@ -327,74 +327,4 @@ public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBr { return getObjectNameForSingleInstanceMBean(); } - - public void resetStatistics() throws Exception - { - getVirtualHost().resetStatistics(); - } - - public double getPeakMessageDeliveryRate() - { - return getVirtualHost().getMessageDeliveryStatistics().getPeak(); - } - - public double getPeakDataDeliveryRate() - { - return getVirtualHost().getDataDeliveryStatistics().getPeak(); - } - - public double getMessageDeliveryRate() - { - return getVirtualHost().getMessageDeliveryStatistics().getRate(); - } - - public double getDataDeliveryRate() - { - return getVirtualHost().getDataDeliveryStatistics().getRate(); - } - - public long getTotalMessagesDelivered() - { - return getVirtualHost().getMessageDeliveryStatistics().getTotal(); - } - - public long getTotalDataDelivered() - { - return getVirtualHost().getDataDeliveryStatistics().getTotal(); - } - - public double getPeakMessageReceiptRate() - { - return getVirtualHost().getMessageReceiptStatistics().getPeak(); - } - - public double getPeakDataReceiptRate() - { - return getVirtualHost().getDataReceiptStatistics().getPeak(); - } - - public double getMessageReceiptRate() - { - return getVirtualHost().getMessageReceiptStatistics().getRate(); - } - - public double getDataReceiptRate() - { - return getVirtualHost().getDataReceiptStatistics().getRate(); - } - - public long getTotalMessagesReceived() - { - return getVirtualHost().getMessageReceiptStatistics().getTotal(); - } - - public long getTotalDataReceived() - { - return getVirtualHost().getDataReceiptStatistics().getTotal(); - } - - public boolean isStatisticsEnabled() - { - return getVirtualHost().isStatisticsEnabled(); - } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 8141533045..4f86c82578 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -22,7 +22,6 @@ package org.apache.qpid.server; import org.apache.log4j.Logger; -import org.apache.qpid.AMQConnectionException; import org.apache.qpid.AMQException; import org.apache.qpid.AMQSecurityException; import org.apache.qpid.framing.AMQMethodBody; @@ -142,7 +141,6 @@ public class AMQChannel implements SessionConfig, AMQSessionModel private final AtomicLong _txnCommits = new AtomicLong(0); private final AtomicLong _txnRejects = new AtomicLong(0); private final AtomicLong _txnCount = new AtomicLong(0); - private final AtomicLong _txnUpdateTime = new AtomicLong(0); private final AMQProtocolSession _session; private AtomicBoolean _closing = new AtomicBoolean(false); @@ -202,11 +200,6 @@ public class AMQChannel implements SessionConfig, AMQSessionModel return !(_transaction instanceof AutoCommitTransaction); } - public boolean inTransaction() - { - return isTransactional() && _txnUpdateTime.get() > 0 && _transaction.getTransactionStartTime() > 0; - } - private void incrementOutstandingTxnsIfNecessary() { if(isTransactional()) @@ -302,6 +295,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel }); deliverCurrentMessageIfComplete(); + } } @@ -339,15 +333,11 @@ public class AMQChannel implements SessionConfig, AMQSessionModel { _transaction.enqueue(destinationQueues, _currentMessage, new MessageDeliveryAction(_currentMessage, destinationQueues, isTransactional())); incrementOutstandingTxnsIfNecessary(); - updateTransactionalActivity(); } } } finally { - long bodySize = _currentMessage.getSize(); - long timestamp = ((BasicContentHeaderProperties) _currentMessage.getContentHeader().getProperties()).getTimestamp(); - _session.registerMessageReceived(bodySize, timestamp); _currentMessage = null; } } @@ -804,7 +794,6 @@ public class AMQChannel implements SessionConfig, AMQSessionModel { Collection<QueueEntry> ackedMessages = getAckedMessages(deliveryTag, multiple); _transaction.dequeue(ackedMessages, new MessageAcknowledgeAction(ackedMessages)); - updateTransactionalActivity(); } private Collection<QueueEntry> getAckedMessages(long deliveryTag, boolean multiple) @@ -979,17 +968,6 @@ public class AMQChannel implements SessionConfig, AMQSessionModel } - /** - * Update last transaction activity timestamp - */ - private void updateTransactionalActivity() - { - if (isTransactional()) - { - _txnUpdateTime.set(System.currentTimeMillis()); - } - } - public String toString() { return "["+_session.toString()+":"+_channelId+"]"; @@ -1040,7 +1018,6 @@ public class AMQChannel implements SessionConfig, AMQSessionModel { getProtocolSession().getProtocolOutputConverter().writeDeliver(entry, getChannelId(), deliveryTag, sub.getConsumerTag()); - _session.registerMessageDelivered(entry.getMessage().getSize()); } }; @@ -1079,11 +1056,11 @@ public class AMQChannel implements SessionConfig, AMQSessionModel private boolean checkMessageUserId(ContentHeaderBody header) { AMQShortString userID = - header.getProperties() instanceof BasicContentHeaderProperties - ? ((BasicContentHeaderProperties) header.getProperties()).getUserId() + header.properties instanceof BasicContentHeaderProperties + ? ((BasicContentHeaderProperties) header.properties).getUserId() : null; - return (!MSG_AUTH || _session.getAuthorizedPrincipal().getName().equals(userID == null? "" : userID.toString())); + return (!MSG_AUTH || _session.getPrincipal().getName().equals(userID == null? "" : userID.toString())); } @@ -1430,36 +1407,4 @@ public class AMQChannel implements SessionConfig, AMQSessionModel { _session.mgmtCloseChannel(_channelId); } - - public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose) throws AMQException - { - if (inTransaction()) - { - long currentTime = System.currentTimeMillis(); - long openTime = currentTime - _transaction.getTransactionStartTime(); - long idleTime = currentTime - _txnUpdateTime.get(); - - // Log a warning on idle or open transactions - if (idleWarn > 0L && idleTime > idleWarn) - { - CurrentActor.get().message(_logSubject, ChannelMessages.IDLE_TXN(idleTime)); - _logger.warn("IDLE TRANSACTION ALERT " + _logSubject.toString() + " " + idleTime + " ms"); - } - else if (openWarn > 0L && openTime > openWarn) - { - CurrentActor.get().message(_logSubject, ChannelMessages.OPEN_TXN(openTime)); - _logger.warn("OPEN TRANSACTION ALERT " + _logSubject.toString() + " " + openTime + " ms"); - } - - // Close connection for idle or open transactions that have timed out - if (idleClose > 0L && idleTime > idleClose) - { - getConnectionModel().closeSession(this, AMQConstant.RESOURCE_ERROR, "Idle transaction timed out"); - } - else if (openClose > 0L && openTime > openClose) - { - getConnectionModel().closeSession(this, AMQConstant.RESOURCE_ERROR, "Open transaction timed out"); - } - } - } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/Broker.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/Broker.java deleted file mode 100644 index ffc323a23b..0000000000 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/Broker.java +++ /dev/null @@ -1,420 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server; - -import static org.apache.qpid.transport.ConnectionSettings.WILDCARD_ADDRESS; - -import java.io.File; -import java.io.IOException; -import java.io.InputStream; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.util.EnumSet; -import java.util.HashSet; -import java.util.List; -import java.util.Properties; -import java.util.Set; - -import org.apache.log4j.PropertyConfigurator; -import org.apache.log4j.xml.QpidLog4JConfigurator; -import org.apache.qpid.server.configuration.ServerConfiguration; -import org.apache.qpid.server.configuration.ServerNetworkTransportConfiguration; -import org.apache.qpid.server.configuration.management.ConfigurationManagementMBean; -import org.apache.qpid.server.information.management.ServerInformationMBean; -import org.apache.qpid.server.logging.SystemOutMessageLogger; -import org.apache.qpid.server.logging.actors.BrokerActor; -import org.apache.qpid.server.logging.actors.CurrentActor; -import org.apache.qpid.server.logging.actors.GenericActor; -import org.apache.qpid.server.logging.management.LoggingManagementMBean; -import org.apache.qpid.server.logging.messages.BrokerMessages; -import org.apache.qpid.server.protocol.AMQProtocolEngineFactory; -import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory; -import org.apache.qpid.server.protocol.AmqpProtocolVersion; -import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry; -import org.apache.qpid.server.transport.QpidAcceptor; -import org.apache.qpid.ssl.SSLContextFactory; -import org.apache.qpid.transport.NetworkTransportConfiguration; -import org.apache.qpid.transport.network.IncomingNetworkTransport; -import org.apache.qpid.transport.network.Transport; -import org.apache.qpid.transport.network.mina.MinaNetworkTransport; - -public class Broker -{ - private static final int IPV4_ADDRESS_LENGTH = 4; - private static final char IPV4_LITERAL_SEPARATOR = '.'; - - protected static class InitException extends RuntimeException - { - private static final long serialVersionUID = 1L; - - InitException(String msg, Throwable cause) - { - super(msg, cause); - } - } - - public void shutdown() - { - ApplicationRegistry.remove(); - } - - public void startup() throws Exception - { - startup(new BrokerOptions()); - } - - public void startup(BrokerOptions options) throws Exception - { - try - { - CurrentActor.set(new BrokerActor(new SystemOutMessageLogger())); - startupImpl(options); - } - finally - { - CurrentActor.remove(); - } - } - - private void startupImpl(final BrokerOptions options) throws Exception - { - final String qpidHome = options.getQpidHome(); - final File configFile = getConfigFile(options.getConfigFile(), - BrokerOptions.DEFAULT_CONFIG_FILE, qpidHome, true); - - CurrentActor.get().message(BrokerMessages.CONFIG(configFile.getAbsolutePath())); - - File logConfigFile = getConfigFile(options.getLogConfigFile(), - BrokerOptions.DEFAULT_LOG_CONFIG_FILE, qpidHome, false); - - configureLogging(logConfigFile, options.getLogWatchFrequency()); - - ConfigurationFileApplicationRegistry config = new ConfigurationFileApplicationRegistry(configFile); - ServerConfiguration serverConfig = config.getConfiguration(); - updateManagementPort(serverConfig, options.getJmxPort()); - - ApplicationRegistry.initialise(config); - - // We have already loaded the BrokerMessages class by this point so we - // need to refresh the locale setting incase we had a different value in - // the configuration. - BrokerMessages.reload(); - - // AR.initialise() sets and removes its own actor so we now need to set the actor - // for the remainder of the startup, and the default actor if the stack is empty - CurrentActor.set(new BrokerActor(config.getCompositeStartupMessageLogger())); - CurrentActor.setDefault(new BrokerActor(config.getRootMessageLogger())); - GenericActor.setDefaultMessageLogger(config.getRootMessageLogger()); - - try - { - configureLoggingManagementMBean(logConfigFile, options.getLogWatchFrequency()); - - ConfigurationManagementMBean configMBean = new ConfigurationManagementMBean(); - configMBean.register(); - - ServerInformationMBean sysInfoMBean = new ServerInformationMBean(config); - sysInfoMBean.register(); - - Set<Integer> ports = new HashSet<Integer>(options.getPorts()); - if(ports.isEmpty()) - { - parsePortList(ports, serverConfig.getPorts()); - } - - Set<Integer> sslPorts = new HashSet<Integer>(options.getSSLPorts()); - if(sslPorts.isEmpty()) - { - parsePortList(sslPorts, serverConfig.getSSLPorts()); - } - - Set<Integer> exclude_0_10 = new HashSet<Integer>(options.getExcludedPorts(ProtocolExclusion.v0_10)); - if(exclude_0_10.isEmpty()) - { - parsePortList(exclude_0_10, serverConfig.getPortExclude010()); - } - - Set<Integer> exclude_0_9_1 = new HashSet<Integer>(options.getExcludedPorts(ProtocolExclusion.v0_9_1)); - if(exclude_0_9_1.isEmpty()) - { - parsePortList(exclude_0_9_1, serverConfig.getPortExclude091()); - } - - Set<Integer> exclude_0_9 = new HashSet<Integer>(options.getExcludedPorts(ProtocolExclusion.v0_9)); - if(exclude_0_9.isEmpty()) - { - parsePortList(exclude_0_9, serverConfig.getPortExclude09()); - } - - Set<Integer> exclude_0_8 = new HashSet<Integer>(options.getExcludedPorts(ProtocolExclusion.v0_8)); - if(exclude_0_8.isEmpty()) - { - parsePortList(exclude_0_8, serverConfig.getPortExclude08()); - } - - String bindAddr = options.getBind(); - if (bindAddr == null) - { - bindAddr = serverConfig.getBind(); - } - - InetAddress bindAddress = null; - if (bindAddr.equals(WILDCARD_ADDRESS)) - { - bindAddress = new InetSocketAddress(0).getAddress(); - } - else - { - bindAddress = InetAddress.getByAddress(parseIP(bindAddr)); - } - String hostName = bindAddress.getCanonicalHostName(); - - if (!serverConfig.getSSLOnly()) - { - for(int port : ports) - { - Set<AmqpProtocolVersion> supported = EnumSet.allOf(AmqpProtocolVersion.class); - - if(exclude_0_10.contains(port)) - { - supported.remove(AmqpProtocolVersion.v0_10); - } - - if(exclude_0_9_1.contains(port)) - { - supported.remove(AmqpProtocolVersion.v0_9_1); - } - if(exclude_0_9.contains(port)) - { - supported.remove(AmqpProtocolVersion.v0_9); - } - if(exclude_0_8.contains(port)) - { - supported.remove(AmqpProtocolVersion.v0_8); - } - - NetworkTransportConfiguration settings = - new ServerNetworkTransportConfiguration(serverConfig, port, bindAddress.getHostName(), Transport.TCP); - - IncomingNetworkTransport transport = Transport.getIncomingTransportInstance(); - MultiVersionProtocolEngineFactory protocolEngineFactory = - new MultiVersionProtocolEngineFactory(hostName, supported); - - transport.accept(settings, protocolEngineFactory, null); - ApplicationRegistry.getInstance().addAcceptor(new InetSocketAddress(bindAddress, port), - new QpidAcceptor(transport,"TCP")); - CurrentActor.get().message(BrokerMessages.LISTENING("TCP", port)); - } - } - - if (serverConfig.getEnableSSL()) - { - String keystorePath = serverConfig.getKeystorePath(); - String keystorePassword = serverConfig.getKeystorePassword(); - String certType = serverConfig.getCertType(); - SSLContextFactory sslFactory = - new SSLContextFactory(keystorePath, keystorePassword, certType); - - for(int sslPort : sslPorts) - { - NetworkTransportConfiguration settings = - new ServerNetworkTransportConfiguration(serverConfig, sslPort, bindAddress.getHostName(), Transport.TCP); - - IncomingNetworkTransport transport = new MinaNetworkTransport(); - - transport.accept(settings, new AMQProtocolEngineFactory(), sslFactory); - - ApplicationRegistry.getInstance().addAcceptor(new InetSocketAddress(bindAddress, sslPort), - new QpidAcceptor(transport,"TCP")); - CurrentActor.get().message(BrokerMessages.LISTENING("TCP/SSL", sslPort)); - } - } - - CurrentActor.get().message(BrokerMessages.READY()); - } - finally - { - // Startup is complete so remove the AR initialised Startup actor - CurrentActor.remove(); - } - } - - private File getConfigFile(final String fileName, - final String defaultFileName, - final String qpidHome, boolean throwOnFileNotFound) throws InitException - { - File configFile = null; - if (fileName != null) - { - configFile = new File(fileName); - } - else - { - configFile = new File(qpidHome, defaultFileName); - } - - if (!configFile.exists() && throwOnFileNotFound) - { - String error = "File " + fileName + " could not be found. Check the file exists and is readable."; - - if (qpidHome == null) - { - error = error + "\nNote: " + BrokerOptions.QPID_HOME + " is not set."; - } - - throw new InitException(error, null); - } - - return configFile; - } - - public static void parsePortList(Set<Integer> output, List<?> ports) throws InitException - { - if(ports != null) - { - for(Object o : ports) - { - try - { - output.add(Integer.parseInt(String.valueOf(o))); - } - catch (NumberFormatException e) - { - throw new InitException("Invalid port: " + o, e); - } - } - } - } - - /** - * Update the configuration data with the management port. - * @param configuration - * @param managementPort The string from the command line - */ - private void updateManagementPort(ServerConfiguration configuration, Integer managementPort) - { - if (managementPort != null) - { - try - { - configuration.setJMXManagementPort(managementPort); - } - catch (NumberFormatException e) - { - throw new InitException("Invalid management port: " + managementPort, null); - } - } - } - - private byte[] parseIP(String address) throws Exception - { - char[] literalBuffer = address.toCharArray(); - int byteCount = 0; - int currByte = 0; - byte[] ip = new byte[IPV4_ADDRESS_LENGTH]; - for (int i = 0; i < literalBuffer.length; i++) - { - char currChar = literalBuffer[i]; - if ((currChar >= '0') && (currChar <= '9')) - { - currByte = (currByte * 10) + (Character.digit(currChar, 10) & 0xFF); - } - - if (currChar == IPV4_LITERAL_SEPARATOR || (i + 1 == literalBuffer.length)) - { - ip[byteCount++] = (byte) currByte; - currByte = 0; - } - } - - if (byteCount != 4) - { - throw new Exception("Invalid IP address: " + address); - } - return ip; - } - - private void configureLogging(File logConfigFile, long logWatchTime) throws InitException, IOException - { - if (logConfigFile.exists() && logConfigFile.canRead()) - { - CurrentActor.get().message(BrokerMessages.LOG_CONFIG(logConfigFile.getAbsolutePath())); - - if (logWatchTime > 0) - { - System.out.println("log file " + logConfigFile.getAbsolutePath() + " will be checked for changes every " - + logWatchTime + " seconds"); - // log4j expects the watch interval in milliseconds - try - { - QpidLog4JConfigurator.configureAndWatch(logConfigFile.getPath(), logWatchTime * 1000); - } - catch (Exception e) - { - throw new InitException(e.getMessage(),e); - } - } - else - { - try - { - QpidLog4JConfigurator.configure(logConfigFile.getPath()); - } - catch (Exception e) - { - throw new InitException(e.getMessage(),e); - } - } - } - else - { - System.err.println("Logging configuration error: unable to read file " + logConfigFile.getAbsolutePath()); - System.err.println("Using the fallback internal log4j.properties configuration"); - - InputStream propsFile = this.getClass().getResourceAsStream("/log4j.properties"); - if(propsFile == null) - { - throw new IOException("Unable to load the fallback internal log4j.properties configuration file"); - } - else - { - try - { - Properties fallbackProps = new Properties(); - fallbackProps.load(propsFile); - PropertyConfigurator.configure(fallbackProps); - } - finally - { - propsFile.close(); - } - } - } - } - - private void configureLoggingManagementMBean(File logConfigFile, int logWatchTime) throws Exception - { - LoggingManagementMBean blm = new LoggingManagementMBean(logConfigFile.getPath(),logWatchTime); - - blm.register(); - } -} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/BrokerOptions.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/BrokerOptions.java deleted file mode 100644 index b83da92660..0000000000 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/BrokerOptions.java +++ /dev/null @@ -1,152 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server; - -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; - -public class BrokerOptions -{ - /** serialVersionUID */ - private static final long serialVersionUID = 8051825964945442234L; - - public static final String DEFAULT_CONFIG_FILE = "etc/config.xml"; - public static final String DEFAULT_LOG_CONFIG_FILE = "etc/log4j.xml"; - public static final String QPID_HOME = "QPID_HOME"; - - public static final String PORTS = "p"; - public static final String SSL_PORTS = "s"; - public static final String BIND = "b"; - public static final String MANAGEMENT = "m"; - public static final String LOG_CONFIG = "l"; - public static final String WATCH = "w"; - public static final String CONFIG = "c"; - - private final Set<Integer> _ports = new HashSet<Integer>(); - private final Set<Integer> _sslPorts = new HashSet<Integer>(); - private final Map<ProtocolExclusion,Set<Integer>> _exclusionMap = new HashMap<ProtocolExclusion, Set<Integer>>(); - - private String _configFile; - private String _logConfigFile; - private String _bind; - private Integer _jmxPort; - - private Integer _logWatchFrequency = 0; - - public void addPort(final int port) - { - _ports.add(port); - } - - public void addSSLPort(final int sslPort) - { - _sslPorts.add(sslPort); - } - - public Set<Integer> getPorts() - { - return Collections.unmodifiableSet(_ports); - } - - public Set<Integer> getSSLPorts() - { - return Collections.unmodifiableSet(_sslPorts); - } - - public String getConfigFile() - { - return _configFile; - } - - public void setConfigFile(final String configFile) - { - _configFile = configFile; - } - - public String getLogConfigFile() - { - return _logConfigFile; - } - - public void setLogConfigFile(final String logConfigFile) - { - _logConfigFile = logConfigFile; - } - - public Integer getJmxPort() - { - return _jmxPort; - } - - public void setJmxPort(final int jmxPort) - { - _jmxPort = jmxPort; - } - - public String getQpidHome() - { - return System.getProperty(QPID_HOME); - } - - public Set<Integer> getExcludedPorts(final ProtocolExclusion excludeProtocol) - { - final Set<Integer> excludedPorts = _exclusionMap.get(excludeProtocol); - return excludedPorts == null ? Collections.<Integer>emptySet() : excludedPorts; - } - - public void addExcludedPort(final ProtocolExclusion excludeProtocol, final int port) - { - if (!_exclusionMap.containsKey(excludeProtocol)) - { - _exclusionMap.put(excludeProtocol, new HashSet<Integer>()); - } - - Set<Integer> ports = _exclusionMap.get(excludeProtocol); - ports.add(port); - } - - public String getBind() - { - return _bind; - } - - public void setBind(final String bind) - { - _bind = bind; - } - - public int getLogWatchFrequency() - { - return _logWatchFrequency; - } - - /** - * Set the frequency with which the log config file will be checked for updates. - * @param logWatchFrequency frequency in seconds - */ - public void setLogWatchFrequency(final int logWatchFrequency) - { - _logWatchFrequency = logWatchFrequency; - } -}
\ No newline at end of file diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java index 449b52d737..71cf17ed60 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java @@ -20,6 +20,17 @@ */ package org.apache.qpid.server; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.util.EnumSet; +import java.util.HashSet; +import java.util.List; +import java.util.Properties; +import java.util.Set; + import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.HelpFormatter; import org.apache.commons.cli.Option; @@ -28,9 +39,28 @@ import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; import org.apache.commons.cli.PosixParser; import org.apache.log4j.Logger; -import org.apache.qpid.server.Broker.InitException; +import org.apache.log4j.PropertyConfigurator; +import org.apache.log4j.xml.QpidLog4JConfigurator; +import org.apache.qpid.common.QpidProperties; +import org.apache.qpid.framing.ProtocolVersion; +import org.apache.qpid.server.configuration.ServerConfiguration; +import org.apache.qpid.server.configuration.management.ConfigurationManagementMBean; +import org.apache.qpid.server.information.management.ServerInformationMBean; +import org.apache.qpid.server.logging.SystemOutMessageLogger; +import org.apache.qpid.server.logging.actors.BrokerActor; +import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.actors.GenericActor; +import org.apache.qpid.server.logging.management.LoggingManagementMBean; +import org.apache.qpid.server.logging.messages.BrokerMessages; +import org.apache.qpid.server.protocol.AMQProtocolEngineFactory; +import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory; +import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory.VERSION; import org.apache.qpid.server.registry.ApplicationRegistry; - +import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry; +import org.apache.qpid.server.transport.QpidAcceptor; +import org.apache.qpid.ssl.SSLContextFactory; +import org.apache.qpid.transport.NetworkDriver; +import org.apache.qpid.transport.network.mina.MINANetworkDriver; /** * Main entry point for AMQPD. @@ -38,41 +68,37 @@ import org.apache.qpid.server.registry.ApplicationRegistry; */ public class Main { - private final Options options = new Options(); - private CommandLine commandLine; + private static Logger _logger; - public static void main(String[] args) + private static final String DEFAULT_CONFIG_FILE = "etc/config.xml"; + + public static final String DEFAULT_LOG_CONFIG_FILENAME = "log4j.xml"; + public static final String QPID_HOME = "QPID_HOME"; + private static final int IPV4_ADDRESS_LENGTH = 4; + + private static final char IPV4_LITERAL_SEPARATOR = '.'; + + protected static class InitException extends Exception { - //if the -Dlog4j.configuration property has not been set, enable the init override - //to stop Log4J wondering off and picking up the first log4j.xml/properties file it - //finds from the classpath when we get the first Loggers - if(System.getProperty("log4j.configuration") == null) + InitException(String msg, Throwable cause) { - System.setProperty("log4j.defaultInitOverride", "true"); + super(msg, cause); } - - new Main(args); } - public Main(final String[] args) + protected final Options options = new Options(); + protected CommandLine commandLine; + + protected Main(String[] args) { setOptions(options); if (parseCommandline(args)) { - try - { - execute(); - } - catch(Exception e) - { - System.err.println("Exception during startup: " + e); - e.printStackTrace(); - shutdown(1); - } + execute(); } } - protected boolean parseCommandline(final String[] args) + protected boolean parseCommandline(String[] args) { try { @@ -90,7 +116,8 @@ public class Main } } - protected void setOptions(final Options options) + @SuppressWarnings("static-access") + protected void setOptions(Options options) { Option help = new Option("h", "help", false, "print this message"); Option version = new Option("v", "version", false, "print the version information and exit"); @@ -134,21 +161,16 @@ public class Main Option bind = OptionBuilder.withArgName("bind").hasArg() .withDescription("bind to the specified address. Overrides any value in the config file") - .withLongOpt("bind").create(BrokerOptions.BIND); + .withLongOpt("bind").create("b"); Option logconfig = OptionBuilder.withArgName("logconfig").hasArg() .withDescription("use the specified log4j xml configuration file. By " - + "default looks for a file named " + BrokerOptions.DEFAULT_LOG_CONFIG_FILE - + " in the same directory as the configuration file").withLongOpt("logconfig").create(BrokerOptions.LOG_CONFIG); + + "default looks for a file named " + DEFAULT_LOG_CONFIG_FILENAME + + " in the same directory as the configuration file").withLongOpt("logconfig").create("l"); Option logwatchconfig = OptionBuilder.withArgName("logwatch").hasArg() .withDescription("monitor the log file configuration file for changes. Units are seconds. " - + "Zero means do not check for changes.").withLongOpt("logwatch").create(BrokerOptions.WATCH); - - Option sslport = - OptionBuilder.withArgName("sslport").hasArg() - .withDescription("SSL port. Overrides any value in the config file") - .withLongOpt("sslport").create(BrokerOptions.SSL_PORTS); + + "Zero means do not check for changes.").withLongOpt("logwatch").create("w"); options.addOption(help); options.addOption(version); @@ -162,120 +184,435 @@ public class Main options.addOption(exclude0_8); options.addOption(mport); options.addOption(bind); - options.addOption(sslport); } - protected void execute() throws Exception + protected void execute() { - BrokerOptions options = new BrokerOptions(); - String configFile = commandLine.getOptionValue(BrokerOptions.CONFIG); - if(configFile != null) + // note this understands either --help or -h. If an option only has a long name you can use that but if + // an option has a short name and a long name you must use the short name here. + if (commandLine.hasOption("h")) { - options.setConfigFile(configFile); + HelpFormatter formatter = new HelpFormatter(); + formatter.printHelp("Qpid", options, true); } + else if (commandLine.hasOption("v")) + { + String ver = QpidProperties.getVersionString(); + + StringBuilder protocol = new StringBuilder("AMQP version(s) [major.minor]: "); - String logWatchConfig = commandLine.getOptionValue(BrokerOptions.WATCH); - if(logWatchConfig != null) + boolean first = true; + for (ProtocolVersion pv : ProtocolVersion.getSupportedProtocolVersions()) + { + if (first) + { + first = false; + } + else + { + protocol.append(", "); + } + + protocol.append(pv.getMajorVersion()).append('-').append(pv.getMinorVersion()); + + } + + System.out.println(ver + " (" + protocol + ")"); + } + else { - options.setLogWatchFrequency(Integer.parseInt(logWatchConfig)); + try + { + CurrentActor.set(new BrokerActor(new SystemOutMessageLogger())); + startup(); + CurrentActor.remove(); + } + catch (InitException e) + { + System.out.println("Initialisation Error : " + e.getMessage()); + shutdown(1); + } + catch (Throwable e) + { + System.out.println("Error initialising message broker: " + e); + e.printStackTrace(); + shutdown(1); + } } + } - String logConfig = commandLine.getOptionValue(BrokerOptions.LOG_CONFIG); - if(logConfig != null) + protected void shutdown(int status) + { + ApplicationRegistry.removeAll(); + System.exit(status); + } + + protected void startup() throws Exception + { + final String QpidHome = System.getProperty(QPID_HOME); + final File defaultConfigFile = new File(QpidHome, DEFAULT_CONFIG_FILE); + final File configFile = new File(commandLine.getOptionValue("c", defaultConfigFile.getPath())); + if (!configFile.exists()) { - options.setLogConfigFile(logConfig); + String error = "File " + configFile + " could not be found. Check the file exists and is readable."; + + if (QpidHome == null) + { + error = error + "\nNote: " + QPID_HOME + " is not set."; + } + + throw new InitException(error, null); } + else + { + CurrentActor.get().message(BrokerMessages.CONFIG(configFile.getAbsolutePath())); + } + + String logConfig = commandLine.getOptionValue("l"); + String logWatchConfig = commandLine.getOptionValue("w", "0"); - String jmxPort = commandLine.getOptionValue(BrokerOptions.MANAGEMENT); - if(jmxPort != null) + int logWatchTime = 0; + try { - options.setJmxPort(Integer.parseInt(jmxPort)); + logWatchTime = Integer.parseInt(logWatchConfig); + } + catch (NumberFormatException e) + { + System.err.println("Log watch configuration value of " + logWatchConfig + " is invalid. Must be " + + "a non-negative integer. Using default of zero (no watching configured"); } - String bindAddr = commandLine.getOptionValue(BrokerOptions.BIND); - if (bindAddr != null) + File logConfigFile; + if (logConfig != null) + { + logConfigFile = new File(logConfig); + configureLogging(logConfigFile, logWatchTime); + } + else { - options.setBind(bindAddr); + File configFileDirectory = configFile.getParentFile(); + logConfigFile = new File(configFileDirectory, DEFAULT_LOG_CONFIG_FILENAME); + configureLogging(logConfigFile, logWatchTime); } - String[] portStr = commandLine.getOptionValues(BrokerOptions.PORTS); + ConfigurationFileApplicationRegistry config = new ConfigurationFileApplicationRegistry(configFile); + ServerConfiguration serverConfig = config.getConfiguration(); + updateManagementPort(serverConfig, commandLine.getOptionValue("m")); + + ApplicationRegistry.initialise(config); + + // We have already loaded the BrokerMessages class by this point so we + // need to refresh the locale setting incase we had a different value in + // the configuration. + BrokerMessages.reload(); + + // AR.initialise() sets and removes its own actor so we now need to set the actor + // for the remainder of the startup, and the default actor if the stack is empty + CurrentActor.set(new BrokerActor(config.getCompositeStartupMessageLogger())); + CurrentActor.setDefault(new BrokerActor(config.getRootMessageLogger())); + GenericActor.setDefaultMessageLogger(config.getRootMessageLogger()); + + + try + { + configureLoggingManagementMBean(logConfigFile, logWatchTime); + + ConfigurationManagementMBean configMBean = new ConfigurationManagementMBean(); + configMBean.register(); + + ServerInformationMBean sysInfoMBean = + new ServerInformationMBean(QpidProperties.getBuildVersion(), QpidProperties.getReleaseVersion()); + sysInfoMBean.register(); + + + String[] portStr = commandLine.getOptionValues("p"); + + Set<Integer> ports = new HashSet<Integer>(); + Set<Integer> exclude_0_10 = new HashSet<Integer>(); + Set<Integer> exclude_0_9_1 = new HashSet<Integer>(); + Set<Integer> exclude_0_9 = new HashSet<Integer>(); + Set<Integer> exclude_0_8 = new HashSet<Integer>(); + + if(portStr == null || portStr.length == 0) + { + + parsePortList(ports, serverConfig.getPorts()); + parsePortList(exclude_0_10, serverConfig.getPortExclude010()); + parsePortList(exclude_0_9_1, serverConfig.getPortExclude091()); + parsePortList(exclude_0_9, serverConfig.getPortExclude09()); + parsePortList(exclude_0_8, serverConfig.getPortExclude08()); + + } + else + { + parsePortArray(ports, portStr); + parsePortArray(exclude_0_10, commandLine.getOptionValues("exclude-0-10")); + parsePortArray(exclude_0_9_1, commandLine.getOptionValues("exclude-0-9-1")); + parsePortArray(exclude_0_9, commandLine.getOptionValues("exclude-0-9")); + parsePortArray(exclude_0_8, commandLine.getOptionValues("exclude-0-8")); + + } + + + + + String bindAddr = commandLine.getOptionValue("b"); + if (bindAddr == null) + { + bindAddr = serverConfig.getBind(); + } + InetAddress bindAddress = null; + + + + if (bindAddr.equals("wildcard")) + { + bindAddress = new InetSocketAddress(0).getAddress(); + } + else + { + bindAddress = InetAddress.getByAddress(parseIP(bindAddr)); + } + + String hostName = bindAddress.getCanonicalHostName(); + + + String keystorePath = serverConfig.getKeystorePath(); + String keystorePassword = serverConfig.getKeystorePassword(); + String certType = serverConfig.getCertType(); + SSLContextFactory sslFactory = null; + + if (!serverConfig.getSSLOnly()) + { + + for(int port : ports) + { + + NetworkDriver driver = new MINANetworkDriver(); + + Set<VERSION> supported = EnumSet.allOf(VERSION.class); + + if(exclude_0_10.contains(port)) + { + supported.remove(VERSION.v0_10); + } + + if(exclude_0_9_1.contains(port)) + { + supported.remove(VERSION.v0_9_1); + } + if(exclude_0_9.contains(port)) + { + supported.remove(VERSION.v0_9); + } + if(exclude_0_8.contains(port)) + { + supported.remove(VERSION.v0_8); + } + + MultiVersionProtocolEngineFactory protocolEngineFactory = + new MultiVersionProtocolEngineFactory(hostName, supported); + + + + driver.bind(port, new InetAddress[]{bindAddress}, protocolEngineFactory, + serverConfig.getNetworkConfiguration(), null); + ApplicationRegistry.getInstance().addAcceptor(new InetSocketAddress(bindAddress, port), + new QpidAcceptor(driver,"TCP")); + CurrentActor.get().message(BrokerMessages.LISTENING("TCP", port)); + + } + + } + + if (serverConfig.getEnableSSL()) + { + sslFactory = new SSLContextFactory(keystorePath, keystorePassword, certType); + NetworkDriver driver = new MINANetworkDriver(); + driver.bind(serverConfig.getSSLPort(), new InetAddress[]{bindAddress}, + new AMQProtocolEngineFactory(), serverConfig.getNetworkConfiguration(), sslFactory); + ApplicationRegistry.getInstance().addAcceptor(new InetSocketAddress(bindAddress, serverConfig.getSSLPort()), + new QpidAcceptor(driver,"TCP")); + CurrentActor.get().message(BrokerMessages.LISTENING("TCP/SSL", serverConfig.getSSLPort())); + } + + CurrentActor.get().message(BrokerMessages.READY()); + + } + finally + { + // Startup is complete so remove the AR initialised Startup actor + CurrentActor.remove(); + } + + + + } + + private void parsePortArray(Set<Integer> ports, String[] portStr) + throws InitException + { if(portStr != null) { - parsePortArray(options, portStr, false); - for(ProtocolExclusion pe : ProtocolExclusion.values()) + for(int i = 0; i < portStr.length; i++) { - parsePortArray(options, commandLine.getOptionValues(pe.getExcludeName()), pe); + try + { + ports.add(Integer.parseInt(portStr[i])); + } + catch (NumberFormatException e) + { + throw new InitException("Invalid port: " + portStr[i], e); + } } } + } - String[] sslPortStr = commandLine.getOptionValues(BrokerOptions.SSL_PORTS); - if(sslPortStr != null) + private void parsePortList(Set<Integer> output, List input) + throws InitException + { + if(input != null) { - parsePortArray(options, sslPortStr, true); - for(ProtocolExclusion pe : ProtocolExclusion.values()) + for(Object port : input) { - parsePortArray(options, commandLine.getOptionValues(pe.getExcludeName()), pe); + try + { + output.add(Integer.parseInt(String.valueOf(port))); + } + catch (NumberFormatException e) + { + throw new InitException("Invalid port: " + port, e); + } + } + } + } + + /** + * Update the configuration data with the management port. + * @param configuration + * @param managementPort The string from the command line + */ + private void updateManagementPort(ServerConfiguration configuration, String managementPort) + { + if (managementPort != null) + { + try + { + configuration.setJMXManagementPort(Integer.parseInt(managementPort)); + } + catch (NumberFormatException e) + { + _logger.warn("Invalid management port: " + managementPort + " will use:" + configuration.getJMXManagementPort(), e); } } - - startBroker(options); } - protected void startBroker(final BrokerOptions options) throws Exception + public static void main(String[] args) { - Broker broker = new Broker(); - broker.startup(options); + //if the -Dlog4j.configuration property has not been set, enable the init override + //to stop Log4J wondering off and picking up the first log4j.xml/properties file it + //finds from the classpath when we get the first Loggers + if(System.getProperty("log4j.configuration") == null) + { + System.setProperty("log4j.defaultInitOverride", "true"); + } + + //now that the override status is know, we can instantiate the Loggers + _logger = Logger.getLogger(Main.class); + + new Main(args); } - protected void shutdown(final int status) + private byte[] parseIP(String address) throws Exception { - ApplicationRegistry.remove(); - System.exit(status); + char[] literalBuffer = address.toCharArray(); + int byteCount = 0; + int currByte = 0; + byte[] ip = new byte[IPV4_ADDRESS_LENGTH]; + for (int i = 0; i < literalBuffer.length; i++) + { + char currChar = literalBuffer[i]; + if ((currChar >= '0') && (currChar <= '9')) + { + currByte = (currByte * 10) + (Character.digit(currChar, 10) & 0xFF); + } + + if (currChar == IPV4_LITERAL_SEPARATOR || (i + 1 == literalBuffer.length)) + { + ip[byteCount++] = (byte) currByte; + currByte = 0; + } + } + + if (byteCount != 4) + { + throw new Exception("Invalid IP address: " + address); + } + return ip; } - private static void parsePortArray(final BrokerOptions options,final Object[] ports, - final boolean ssl) throws InitException + private void configureLogging(File logConfigFile, int logWatchTime) throws InitException, IOException { - if(ports != null) + if (logConfigFile.exists() && logConfigFile.canRead()) { - for(int i = 0; i < ports.length; i++) + CurrentActor.get().message(BrokerMessages.LOG_CONFIG(logConfigFile.getAbsolutePath())); + + if (logWatchTime > 0) { + System.out.println("log file " + logConfigFile.getAbsolutePath() + " will be checked for changes every " + + logWatchTime + " seconds"); + // log4j expects the watch interval in milliseconds try { - if(ssl) - { - options.addSSLPort(Integer.parseInt(String.valueOf(ports[i]))); - } - else - { - options.addPort(Integer.parseInt(String.valueOf(ports[i]))); - } + QpidLog4JConfigurator.configureAndWatch(logConfigFile.getPath(), logWatchTime * 1000); } - catch (NumberFormatException e) + catch (Exception e) + { + throw new InitException(e.getMessage(),e); + } + } + else + { + try { - throw new InitException("Invalid port: " + ports[i], e); + QpidLog4JConfigurator.configure(logConfigFile.getPath()); + } + catch (Exception e) + { + throw new InitException(e.getMessage(),e); } } } - } - - private static void parsePortArray(final BrokerOptions options, final Object[] ports, - final ProtocolExclusion excludedProtocol) throws InitException - { - if(ports != null) + else { - for(int i = 0; i < ports.length; i++) + System.err.println("Logging configuration error: unable to read file " + logConfigFile.getAbsolutePath()); + System.err.println("Using the fallback internal log4j.properties configuration"); + + InputStream propsFile = this.getClass().getResourceAsStream("/log4j.properties"); + if(propsFile == null) + { + throw new IOException("Unable to load the fallback internal log4j.properties configuration file"); + } + else { try { - options.addExcludedPort(excludedProtocol, - Integer.parseInt(String.valueOf(ports[i]))); + Properties fallbackProps = new Properties(); + fallbackProps.load(propsFile); + PropertyConfigurator.configure(fallbackProps); } - catch (NumberFormatException e) + finally { - throw new InitException("Invalid port for exclusion: " + ports[i], e); + propsFile.close(); } } } } + + private void configureLoggingManagementMBean(File logConfigFile, int logWatchTime) throws Exception + { + LoggingManagementMBean blm = new LoggingManagementMBean(logConfigFile.getPath(),logWatchTime); + + blm.register(); + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/ProtocolExclusion.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/ProtocolExclusion.java deleted file mode 100644 index 22d97d36dd..0000000000 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/ProtocolExclusion.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server; - -import java.util.HashMap; -import java.util.Map; - -public enum ProtocolExclusion -{ - v0_8("exclude-0-8","--exclude-0-8"), - v0_9("exclude-0-9", "--exclude-0-9"), - v0_9_1("exclude-0-9-1", "--exclude-0-9-1"), - v0_10("exclude-0-10", "--exclude-0-10"); - - private static final Map<String, ProtocolExclusion> MAP = new HashMap<String,ProtocolExclusion>(); - - static - { - for(ProtocolExclusion pe : ProtocolExclusion.values()) - { - MAP.put(pe.getArg(), pe); - } - } - - private String _arg; - private String _excludeName; - - private ProtocolExclusion(final String excludeName, final String arg) - { - _excludeName = excludeName; - _arg = arg; - } - - public String getArg() - { - return _arg; - } - - public String getExcludeName() - { - return _excludeName; - } - - public static ProtocolExclusion lookup(final String arg) - { - ProtocolExclusion ex = MAP.get(arg); - - if(ex == null) - { - throw new IllegalArgumentException(arg + " is not a valid protocol exclusion"); - } - - return ex; - } -} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java index 0621b87f0a..7197ec8cdc 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java @@ -20,8 +20,6 @@ package org.apache.qpid.server.configuration; -import static org.apache.qpid.transport.ConnectionSettings.WILDCARD_ADDRESS; - import java.io.File; import java.util.Collections; import java.util.HashMap; @@ -44,7 +42,7 @@ import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; -import org.apache.qpid.transport.NetworkTransportConfiguration; +import org.apache.qpid.transport.NetworkDriverConfiguration; import sun.misc.Signal; import sun.misc.SignalHandler; @@ -54,7 +52,9 @@ public class ServerConfiguration extends ConfigurationPlugin implements SignalHa protected static final Logger _logger = Logger.getLogger(ServerConfiguration.class); // Default Configuration values - public static final int DEFAULT_BUFFER_SIZE = 262144; + public static final int DEFAULT_BUFFER_READ_LIMIT_SIZE = 262144; + public static final int DEFAULT_BUFFER_WRITE_LIMIT_SIZE = 262144; + public static final boolean DEFAULT_BROKER_CONNECTOR_PROTECTIO_ENABLED = false; public static final String DEFAULT_STATUS_UPDATES = "on"; public static final String SECURITY_CONFIG_RELOADED = "SECURITY CONFIGURATION RELOADED"; @@ -63,7 +63,7 @@ public class ServerConfiguration extends ConfigurationPlugin implements SignalHa public static final int DEFAULT_SSL_PORT = 8672; public static final long DEFAULT_HOUSEKEEPING_PERIOD = 30000L; public static final int DEFAULT_JMXPORT = 8999; - + public static final String QPID_HOME = "QPID_HOME"; public static final String QPID_WORK = "QPID_WORK"; public static final String LIB_DIR = "lib"; @@ -84,6 +84,9 @@ public class ServerConfiguration extends ConfigurationPlugin implements SignalHa // Configuration values to be read from the configuration file //todo Move all properties to static values to ensure system testing can be performed. + public static final String CONNECTOR_PROTECTIO_ENABLED = "connector.protectio.enabled"; + public static final String CONNECTOR_PROTECTIO_READ_BUFFER_LIMIT_SIZE = "connector.protectio.readBufferLimitSize"; + public static final String CONNECTOR_PROTECTIO_WRITE_BUFFER_LIMIT_SIZE = "connector.protectio.writeBufferLimitSize"; public static final String MGMT_CUSTOM_REGISTRY_SOCKET = "management.custom-registry-socket"; public static final String STATUS_UPDATES = "status-updates"; public static final String ADVANCED_LOCALE = "advanced.locale"; @@ -92,6 +95,7 @@ public class ServerConfiguration extends ConfigurationPlugin implements SignalHa envVarMap.put("QPID_PORT", "connector.port"); envVarMap.put("QPID_ENABLEDIRECTBUFFERS", "advanced.enableDirectBuffers"); envVarMap.put("QPID_SSLPORT", "connector.ssl.port"); + envVarMap.put("QPID_NIO", "connector.qpidnio"); envVarMap.put("QPID_WRITEBIASED", "advanced.useWriteBiasedPool"); envVarMap.put("QPID_JMXPORT", "management.jmxport"); envVarMap.put("QPID_FRAMESIZE", "advanced.framesize"); @@ -144,7 +148,7 @@ public class ServerConfiguration extends ConfigurationPlugin implements SignalHa } catch (Exception e) { - _logger.info("Signal HUP not supported for OS: " + System.getProperty("os.name")); + _logger.error("Signal HUP not supported for OS: " + System.getProperty("os.name")); // We're on something that doesn't handle SIGHUP, how sad, Windows. } } @@ -201,29 +205,7 @@ public class ServerConfiguration extends ConfigurationPlugin implements SignalHa @Override public void validateConfiguration() throws ConfigurationException { - // Support for security.jmx.access was removed when JMX access rights were incorporated into the main ACL. - // This ensure that users remove the element from their configuration file. - - if (getListValue("security.jmx.access").size() > 0) - { - String message = "Validation error : security/jmx/access is no longer a supported element within the configuration xml." - + (_configFile == null ? "" : " Configuration file : " + _configFile); - throw new ConfigurationException(message); - } - - if (getListValue("security.jmx.principal-database").size() > 0) - { - String message = "Validation error : security/jmx/principal-database is no longer a supported element within the configuration xml." - + (_configFile == null ? "" : " Configuration file : " + _configFile); - throw new ConfigurationException(message); - } - - if (getListValue("security.principal-databases.principal-database(0).class").size() > 0) - { - String message = "Validation error : security/principal-databases is no longer supported within the configuration xml." - + (_configFile == null ? "" : " Configuration file : " + _configFile); - throw new ConfigurationException(message); - } + //Currently doesn't do validation } /* @@ -521,11 +503,58 @@ public class ServerConfiguration extends ConfigurationPlugin implements SignalHa _virtualHosts.put(config.getName(), config); } + public List<String> getPrincipalDatabaseNames() + { + return getListValue("security.principal-databases.principal-database.name"); + } + + public List<String> getPrincipalDatabaseClass() + { + return getListValue("security.principal-databases.principal-database.class"); + } + + public List<String> getPrincipalDatabaseAttributeNames(int index) + { + String name = "security.principal-databases.principal-database(" + index + ")." + "attributes.attribute.name"; + return getListValue(name); + } + + public List<String> getPrincipalDatabaseAttributeValues(int index) + { + String name = "security.principal-databases.principal-database(" + index + ")." + "attributes.attribute.value"; + return getListValue(name); + } + + public List<String> getManagementPrincipalDBs() + { + return getListValue("security.jmx.principal-database"); + } + + public List<String> getManagementAccessList() + { + return getListValue("security.jmx.access"); + } + public int getFrameSize() { return getIntValue("advanced.framesize", DEFAULT_FRAME_SIZE); } + public boolean getProtectIOEnabled() + { + return getBooleanValue(CONNECTOR_PROTECTIO_ENABLED, DEFAULT_BROKER_CONNECTOR_PROTECTIO_ENABLED); + } + + public int getBufferReadLimit() + { + return getIntValue(CONNECTOR_PROTECTIO_READ_BUFFER_LIMIT_SIZE, DEFAULT_BUFFER_READ_LIMIT_SIZE); + } + + public int getBufferWriteLimit() + { + return getIntValue(CONNECTOR_PROTECTIO_WRITE_BUFFER_LIMIT_SIZE, DEFAULT_BUFFER_WRITE_LIMIT_SIZE); + } + public boolean getSynchedClocks() { return getBooleanValue("advanced.synced-clocks"); @@ -536,6 +565,11 @@ public class ServerConfiguration extends ConfigurationPlugin implements SignalHa return getBooleanValue("security.msg-auth"); } + public String getJMXPrincipalDatabase() + { + return getStringValue("security.jmx.principal-database"); + } + public String getManagementKeyStorePath() { return getStringValue("management.ssl.keyStorePath"); @@ -616,14 +650,14 @@ public class ServerConfiguration extends ConfigurationPlugin implements SignalHa return getLongValue("flowResumeCapacity", getCapacity()); } - public int getConnectorProcessors() + public int getProcessors() { return getIntValue("connector.processors", 4); } public List getPorts() { - return getListValue("connector.port", Collections.<Integer>singletonList(DEFAULT_PORT)); + return getListValue("connector.port", Collections.singletonList(DEFAULT_PORT)); } public List getPortExclude010() @@ -648,17 +682,17 @@ public class ServerConfiguration extends ConfigurationPlugin implements SignalHa public String getBind() { - return getStringValue("connector.bind", WILDCARD_ADDRESS); + return getStringValue("connector.bind", "wildcard"); } public int getReceiveBufferSize() { - return getIntValue("connector.socketReceiveBuffer", DEFAULT_BUFFER_SIZE); + return getIntValue("connector.socketReceiveBuffer", 32767); } public int getWriteBufferSize() { - return getIntValue("connector.socketWriteBuffer", DEFAULT_BUFFER_SIZE); + return getIntValue("connector.socketWriteBuffer", 32767); } public boolean getTcpNoDelay() @@ -681,9 +715,9 @@ public class ServerConfiguration extends ConfigurationPlugin implements SignalHa return getBooleanValue("connector.ssl.sslOnly"); } - public List getSSLPorts() + public int getSSLPort() { - return getListValue("connector.ssl.port", Collections.<Integer>singletonList(DEFAULT_SSL_PORT)); + return getIntValue("connector.ssl.port", DEFAULT_SSL_PORT); } public String getKeystorePath() @@ -701,6 +735,11 @@ public class ServerConfiguration extends ConfigurationPlugin implements SignalHa return getStringValue("connector.ssl.certType", "SunX509"); } + public boolean getQpidNIO() + { + return getBooleanValue("connector.qpidnio"); + } + public boolean getUseBiasedWrites() { return getBooleanValue("advanced.useWriteBiasedPool"); @@ -728,34 +767,57 @@ public class ServerConfiguration extends ConfigurationPlugin implements SignalHa DEFAULT_HOUSEKEEPING_PERIOD)); } - public long getStatisticsSamplePeriod() + public NetworkDriverConfiguration getNetworkConfiguration() { - return getConfig().getLong("statistics.sample.period", 5000L); - } + return new NetworkDriverConfiguration() + { - public boolean isStatisticsGenerationBrokerEnabled() - { - return getConfig().getBoolean("statistics.generation.broker", false); - } + public Integer getTrafficClass() + { + return null; + } - public boolean isStatisticsGenerationVirtualhostsEnabled() - { - return getConfig().getBoolean("statistics.generation.virtualhosts", false); - } + public Boolean getTcpNoDelay() + { + // Can't call parent getTcpNoDelay since it just calls this one + return getBooleanValue("connector.tcpNoDelay", true); + } - public boolean isStatisticsGenerationConnectionsEnabled() - { - return getConfig().getBoolean("statistics.generation.connections", false); - } + public Integer getSoTimeout() + { + return null; + } - public long getStatisticsReportingPeriod() - { - return getConfig().getLong("statistics.reporting.period", 0L); - } + public Integer getSoLinger() + { + return null; + } - public boolean isStatisticsReportResetEnabled() - { - return getConfig().getBoolean("statistics.reporting.reset", false); + public Integer getSendBufferSize() + { + return getBufferWriteLimit(); + } + + public Boolean getReuseAddress() + { + return null; + } + + public Integer getReceiveBufferSize() + { + return getBufferReadLimit(); + } + + public Boolean getOOBInline() + { + return null; + } + + public Boolean getKeepAlive() + { + return null; + } + }; } public int getMaxChannelCount() diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerNetworkTransportConfiguration.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerNetworkTransportConfiguration.java deleted file mode 100644 index 81dfcb4465..0000000000 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerNetworkTransportConfiguration.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.configuration; - -import org.apache.qpid.transport.NetworkTransportConfiguration; - -public class ServerNetworkTransportConfiguration implements NetworkTransportConfiguration -{ - private final ServerConfiguration _serverConfig; - private final int _port; - private final String _host; - private final String _transport; - - public ServerNetworkTransportConfiguration(final ServerConfiguration serverConfig, - final int port, final String host, - final String transport) - { - _serverConfig = serverConfig; - _port = port; - _host = host; - _transport = transport; - } - - public Boolean getTcpNoDelay() - { - return _serverConfig.getTcpNoDelay(); - } - - public Integer getSendBufferSize() - { - return _serverConfig.getWriteBufferSize(); - } - - public Integer getReceiveBufferSize() - { - return _serverConfig.getReceiveBufferSize(); - } - - public Integer getPort() - { - return _port; - } - - public String getHost() - { - return _host; - } - - public String getTransport() - { - return _transport; - } - - public Integer getConnectorProcessors() - { - return _serverConfig.getConnectorProcessors(); - } -} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java index a710230616..d9d7083543 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java @@ -306,37 +306,11 @@ public class VirtualHostConfiguration extends ConfigurationPlugin @Override public void validateConfiguration() throws ConfigurationException { - // QPID-3249. Support for specifying authentication name at vhost level is no longer supported. - if (getListValue("security.authentication.name").size() > 0) - { - String message = "Validation error : security/authentication/name is no longer a supported element within the configuration xml." - + " It appears in virtual host definition : " + _name; - throw new ConfigurationException(message); - } + //Currently doesn't do validation } public int getHouseKeepingThreadCount() { return getIntValue("housekeeping.poolSize", Runtime.getRuntime().availableProcessors()); } - - public long getTransactionTimeoutOpenWarn() - { - return getLongValue("transactionTimeout.openWarn", 0L); - } - - public long getTransactionTimeoutOpenClose() - { - return getLongValue("transactionTimeout.openClose", 0L); - } - - public long getTransactionTimeoutIdleWarn() - { - return getLongValue("transactionTimeout.idleWarn", 0L); - } - - public long getTransactionTimeoutIdleClose() - { - return getLongValue("transactionTimeout.idleClose", 0L); - } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/ConfigurationPlugin.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/ConfigurationPlugin.java index b4f82649b0..82b576ea51 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/ConfigurationPlugin.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/ConfigurationPlugin.java @@ -24,7 +24,6 @@ import org.apache.commons.configuration.ConversionException; import org.apache.log4j.Logger; import org.apache.qpid.server.configuration.ConfigurationManager; import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.registry.IApplicationRegistry; import java.util.Collections; import java.util.HashMap; @@ -139,28 +138,10 @@ public abstract class ConfigurationPlugin } } - offerRemainingConfigurationToOtherPlugins(path, configuration, elements); - - validateConfiguration(); - } - - private void offerRemainingConfigurationToOtherPlugins(String path, - Configuration configuration, Set<String> elements) throws ConfigurationException - { - final IApplicationRegistry appRegistry = safeGetApplicationRegistryInstance(); - - if (appRegistry == null) - { - // We see this happen during shutdown due to asynchronous reconfig using IO threads. - // Need to remove the responsibility for offering configuration to other class. - _logger.info("Cannot offer remaining config to other plugins, can't find app registry"); - return; - } - - final ConfigurationManager configurationManager = appRegistry.getConfigurationManager(); // Process the elements in the configuration for (String element : elements) { + ConfigurationManager configurationManager = ApplicationRegistry.getInstance().getConfigurationManager(); Configuration handled = element.length() == 0 ? configuration : configuration.subset(element); String configurationElement = element; @@ -181,18 +162,8 @@ public abstract class ConfigurationPlugin _pluginConfiguration.put(plugin.getClass().getName(), plugin); } } - } - private IApplicationRegistry safeGetApplicationRegistryInstance() - { - try - { - return ApplicationRegistry.getInstance(); - } - catch (IllegalStateException ise) - { - return null; - } + validateConfiguration(); } /** Helper method to print out list of keys in a {@link Configuration}. */ diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java index 3786c2020c..bac751e0c8 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java @@ -20,19 +20,19 @@ */ package org.apache.qpid.server.connection; -import java.util.ArrayList; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; import org.apache.log4j.Logger; +import org.apache.qpid.AMQConnectionException; import org.apache.qpid.AMQException; import org.apache.qpid.common.Closeable; import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.server.protocol.AMQConnectionModel; +import org.apache.qpid.server.protocol.AMQProtocolSession; public class ConnectionRegistry implements IConnectionRegistry, Closeable { - private List<AMQConnectionModel> _registry = new CopyOnWriteArrayList<AMQConnectionModel>(); + private List<AMQProtocolSession> _registry = new CopyOnWriteArrayList<AMQProtocolSession>(); private Logger _logger = Logger.getLogger(ConnectionRegistry.class); @@ -40,41 +40,44 @@ public class ConnectionRegistry implements IConnectionRegistry, Closeable { // None required } - - /** Close all of the currently open connections. */ - public void close() + + public void expireClosedChannels() { - while (!_registry.isEmpty()) + for (AMQProtocolSession connection : _registry) { - AMQConnectionModel connection = _registry.get(0); - closeConnection(connection, AMQConstant.INTERNAL_ERROR, "Broker is shutting down"); + connection.closeIfLingeringClosedChannels(); } } - - public void closeConnection(AMQConnectionModel connection, AMQConstant cause, String message) + + /** Close all of the currently open connections. */ + public void close() { - try - { - connection.close(cause, message); - } - catch (AMQException e) + while (!_registry.isEmpty()) { - _logger.warn("Error closing connection:" + e.getMessage()); + AMQProtocolSession connection = _registry.get(0); + + try + { + connection.closeConnection(0, new AMQConnectionException(AMQConstant.INTERNAL_ERROR, "Broker is shutting down", + 0, 0, + connection.getProtocolOutputConverter().getProtocolMajorVersion(), + connection.getProtocolOutputConverter().getProtocolMinorVersion(), + (Throwable) null), true); + } + catch (AMQException e) + { + _logger.warn("Error closing connection:" + e.getMessage()); + } } } - public void registerConnection(AMQConnectionModel connnection) + public void registerConnection(AMQProtocolSession connnection) { _registry.add(connnection); } - public void deregisterConnection(AMQConnectionModel connnection) + public void deregisterConnection(AMQProtocolSession connnection) { _registry.remove(connnection); } - - public List<AMQConnectionModel> getConnections() - { - return new ArrayList<AMQConnectionModel>(_registry); - } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java index b4f5bffa57..002269bbaa 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java @@ -20,23 +20,18 @@ */ package org.apache.qpid.server.connection; -import java.util.List; - +import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.AMQException; -import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.server.protocol.AMQConnectionModel; public interface IConnectionRegistry { + public void initialise(); public void close() throws AMQException; - - public void closeConnection(AMQConnectionModel connection, AMQConstant cause, String message); - - public List<AMQConnectionModel> getConnections(); - public void registerConnection(AMQConnectionModel connnection); + public void registerConnection(AMQProtocolSession connnection); + + public void deregisterConnection(AMQProtocolSession connnection); - public void deregisterConnection(AMQConnectionModel connnection); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchangeMBean.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchangeMBean.java index 0f1b709475..7aeff2561e 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchangeMBean.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchangeMBean.java @@ -90,12 +90,12 @@ public abstract class AbstractExchangeMBean<T extends AbstractExchange> extends public String getObjectInstanceName() { - return ObjectName.quote(_exchange.getName()); + return _exchange.getNameShortString().toString(); } public String getName() { - return _exchange.getName(); + return _exchange.getNameShortString().toString(); } public String getExchangeType() diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/BrokerLink.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/BrokerLink.java index f21158cd0c..fa2fb9ead1 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/BrokerLink.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/BrokerLink.java @@ -258,6 +258,7 @@ public class BrokerLink implements LinkConfig, ConnectionListener _remoteFederationTag = UUID.fromString(_transport+":"+_host+":"+_port).toString(); } _qpidConnection.setSessionFactory(new SessionFactory()); + _qpidConnection.setAuthorizationID(_username == null ? "" : _username); updateState(State.ESTABLISHING, State.OPERATIONAL); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java index 9848f90ea9..11fdeae2b1 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java @@ -37,8 +37,8 @@ import org.apache.qpid.server.queue.Filterable; public class PropertyExpression implements Expression { // Constants - defined the same as JMS - private static enum JMSDeliveryMode { NON_PERSISTENT, PERSISTENT } - + private static final int NON_PERSISTENT = 1; + private static final int PERSISTENT = 2; private static final int DEFAULT_PRIORITY = 4; private static final Logger _logger = org.apache.log4j.Logger.getLogger(PropertyExpression.class); @@ -172,14 +172,13 @@ public class PropertyExpression implements Expression { public Object evaluate(Filterable message) { - JMSDeliveryMode mode = message.isPersistent() ? JMSDeliveryMode.PERSISTENT : - JMSDeliveryMode.NON_PERSISTENT; + int mode = message.isPersistent() ? PERSISTENT : NON_PERSISTENT; if (_logger.isDebugEnabled()) { _logger.debug("JMSDeliveryMode is :" + mode); } - return mode.toString(); + return mode; } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java index 09f35da41d..d4b79134a2 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java @@ -20,9 +20,9 @@ */ package org.apache.qpid.server.handler; - import javax.security.sasl.SaslException; import javax.security.sasl.SaslServer; + import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.framing.ConnectionCloseBody; @@ -68,7 +68,7 @@ public class ConnectionSecureOkMethodHandler implements StateAwareMethodListener } MethodRegistry methodRegistry = session.getMethodRegistry(); AuthenticationResult authResult = authMgr.authenticate(ss, body.getResponse()); - switch (authResult.getStatus()) + switch (authResult.status) { case ERROR: Exception cause = authResult.getCause(); @@ -88,10 +88,7 @@ public class ConnectionSecureOkMethodHandler implements StateAwareMethodListener disposeSaslServer(session); break; case SUCCESS: - if (_logger.isInfoEnabled()) - { - _logger.info("Connected as: " + UsernamePrincipal.getUsernamePrincipalFromSubject(authResult.getSubject())); - } + _logger.info("Connected as: " + ss.getAuthorizationID()); stateManager.changeState(AMQState.CONNECTION_NOT_TUNED); ConnectionTuneBody tuneBody = @@ -99,13 +96,13 @@ public class ConnectionSecureOkMethodHandler implements StateAwareMethodListener ConnectionStartOkMethodHandler.getConfiguredFrameSize(), ApplicationRegistry.getInstance().getConfiguration().getHeartBeatDelay()); session.writeFrame(tuneBody.generateFrame(0)); - session.setAuthorizedSubject(authResult.getSubject()); + session.setAuthorizedID(new UsernamePrincipal(ss.getAuthorizationID())); disposeSaslServer(session); break; case CONTINUE: stateManager.changeState(AMQState.CONNECTION_NOT_AUTH); - ConnectionSecureBody secureBody = methodRegistry.createConnectionSecureBody(authResult.getChallenge()); + ConnectionSecureBody secureBody = methodRegistry.createConnectionSecureBody(authResult.challenge); session.writeFrame(secureBody.generateFrame(0)); } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java index 2dd9a63540..4442f969c4 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java @@ -65,6 +65,7 @@ public class ConnectionStartOkMethodHandler implements StateAwareMethodListener< _logger.info("Locale selected: " + body.getLocale()); AuthenticationManager authMgr = ApplicationRegistry.getInstance().getAuthenticationManager(); + SaslServer ss = null; try { @@ -77,7 +78,8 @@ public class ConnectionStartOkMethodHandler implements StateAwareMethodListener< session.setSaslServer(ss); - final AuthenticationResult authResult = authMgr.authenticate(ss, body.getResponse()); + AuthenticationResult authResult = authMgr.authenticate(ss, body.getResponse()); + //save clientProperties if (session.getClientProperties() == null) { @@ -86,7 +88,7 @@ public class ConnectionStartOkMethodHandler implements StateAwareMethodListener< MethodRegistry methodRegistry = session.getMethodRegistry(); - switch (authResult.getStatus()) + switch (authResult.status) { case ERROR: Exception cause = authResult.getCause(); @@ -106,11 +108,8 @@ public class ConnectionStartOkMethodHandler implements StateAwareMethodListener< break; case SUCCESS: - if (_logger.isInfoEnabled()) - { - _logger.info("Connected as: " + UsernamePrincipal.getUsernamePrincipalFromSubject(authResult.getSubject())); - } - session.setAuthorizedSubject(authResult.getSubject()); + _logger.info("Connected as: " + ss.getAuthorizationID()); + session.setAuthorizedID(new UsernamePrincipal(ss.getAuthorizationID())); stateManager.changeState(AMQState.CONNECTION_NOT_TUNED); @@ -122,7 +121,7 @@ public class ConnectionStartOkMethodHandler implements StateAwareMethodListener< case CONTINUE: stateManager.changeState(AMQState.CONNECTION_NOT_AUTH); - ConnectionSecureBody secureBody = methodRegistry.createConnectionSecureBody(authResult.getChallenge()); + ConnectionSecureBody secureBody = methodRegistry.createConnectionSecureBody(authResult.challenge); session.writeFrame(secureBody.generateFrame(0)); } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java index 0cfed77f2e..8939cfa334 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java @@ -106,7 +106,7 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar else { queue = createQueue(queueName, body, virtualHost, protocolConnection); - queue.setAuthorizationHolder(protocolConnection); + queue.setPrincipalHolder(protocolConnection); if (queue.isDurable() && !queue.isAutoDelete()) { store.createQueue(queue, body.getArguments()); @@ -119,7 +119,7 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar if (body.getExclusive()) { queue.setExclusiveOwningSession(protocolConnection.getChannel(channelId)); - queue.setAuthorizationHolder(protocolConnection); + queue.setPrincipalHolder(protocolConnection); if(!body.getDurable()) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/information/management/ServerInformationMBean.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/information/management/ServerInformationMBean.java index 5e6a143d52..db2cc970b2 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/information/management/ServerInformationMBean.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/information/management/ServerInformationMBean.java @@ -22,11 +22,9 @@ package org.apache.qpid.server.information.management; import java.io.IOException; -import org.apache.qpid.common.QpidProperties; import org.apache.qpid.management.common.mbeans.ServerInformation; import org.apache.qpid.management.common.mbeans.annotations.MBeanDescription; import org.apache.qpid.server.management.AMQManagedObject; -import org.apache.qpid.server.registry.ApplicationRegistry; import javax.management.JMException; @@ -36,15 +34,12 @@ public class ServerInformationMBean extends AMQManagedObject implements ServerIn { private String buildVersion; private String productVersion; - private ApplicationRegistry registry; - public ServerInformationMBean(ApplicationRegistry applicationRegistry) throws JMException + public ServerInformationMBean(String buildVersion, String productVersion) throws JMException { super(ServerInformation.class, ServerInformation.TYPE); - - registry = applicationRegistry; - buildVersion = QpidProperties.getBuildVersion(); - productVersion = QpidProperties.getReleaseVersion(); + this.buildVersion = buildVersion; + this.productVersion = productVersion; } public String getObjectInstanceName() @@ -72,75 +67,5 @@ public class ServerInformationMBean extends AMQManagedObject implements ServerIn return productVersion; } - - public void resetStatistics() throws Exception - { - registry.resetStatistics(); - } - - public double getPeakMessageDeliveryRate() - { - return registry.getMessageDeliveryStatistics().getPeak(); - } - - public double getPeakDataDeliveryRate() - { - return registry.getDataDeliveryStatistics().getPeak(); - } - - public double getMessageDeliveryRate() - { - return registry.getMessageDeliveryStatistics().getRate(); - } - - public double getDataDeliveryRate() - { - return registry.getDataDeliveryStatistics().getRate(); - } - - public long getTotalMessagesDelivered() - { - return registry.getMessageDeliveryStatistics().getTotal(); - } - - public long getTotalDataDelivered() - { - return registry.getDataDeliveryStatistics().getTotal(); - } - - public double getPeakMessageReceiptRate() - { - return registry.getMessageReceiptStatistics().getPeak(); - } - - public double getPeakDataReceiptRate() - { - return registry.getDataReceiptStatistics().getPeak(); - } - - public double getMessageReceiptRate() - { - return registry.getMessageReceiptStatistics().getRate(); - } - - public double getDataReceiptRate() - { - return registry.getDataReceiptStatistics().getRate(); - } - - public long getTotalMessagesReceived() - { - return registry.getMessageReceiptStatistics().getTotal(); - } - - public long getTotalDataReceived() - { - return registry.getDataReceiptStatistics().getTotal(); - } - - public boolean isStatisticsEnabled() - { - return registry.isStatisticsEnabled(); - } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/Broker_logmessages.properties b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/Broker_logmessages.properties index 5d1e85fe41..6b83a7e7a5 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/Broker_logmessages.properties +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/Broker_logmessages.properties @@ -32,7 +32,4 @@ STOPPED = BRK-1005 : Stopped # 0 - path CONFIG = BRK-1006 : Using configuration : {0} # 0 - path -LOG_CONFIG = BRK-1007 : Using logging configuration : {0} - -STATS_DATA = BRK-1008 : {0,choice,0#delivered|1#received} : {1,number,#.###} kB/s peak : {2,number,#} bytes total -STATS_MSGS = BRK-1009 : {0,choice,0#delivered|1#received} : {1,number,#.###} msg/s peak : {2,number,#} msgs total
\ No newline at end of file +LOG_CONFIG = BRK-1007 : Using logging configuration : {0}
\ No newline at end of file diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/Channel_logmessages.properties b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/Channel_logmessages.properties index ed8c0d0ce9..53bcd712f2 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/Channel_logmessages.properties +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/Channel_logmessages.properties @@ -28,7 +28,3 @@ PREFETCH_SIZE = CHN-1004 : Prefetch Size (bytes) {0,number} : Count {1,number} # 0 - queue causing flow control FLOW_ENFORCED = CHN-1005 : Flow Control Enforced (Queue {0}) FLOW_REMOVED = CHN-1006 : Flow Control Removed -# Channel Transactions -# 0 - time in milliseconds -OPEN_TXN = CHN-1007 : Open Transaction : {0,number} ms -IDLE_TXN = CHN-1008 : Idle Transaction : {0,number} ms diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/VirtualHost_logmessages.properties b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/VirtualHost_logmessages.properties index 3e640c7929..66bbefacb0 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/VirtualHost_logmessages.properties +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/VirtualHost_logmessages.properties @@ -20,7 +20,4 @@ # # 0 - name CREATED = VHT-1001 : Created : {0} -CLOSED = VHT-1002 : Closed - -STATS_DATA = VHT-1003 : {0} : {1,choice,0#delivered|1#received} : {2,number,#.###} kB/s peak : {3,number,#} bytes total -STATS_MSGS = VHT-1004 : {0} : {1,choice,0#delivered|1#received} : {2,number,#.###} msg/s peak : {3,number,#} msgs total`
\ No newline at end of file +CLOSED = VHT-1002 : Closed
\ No newline at end of file diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java index 9b357403a8..f28873940b 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java @@ -47,7 +47,7 @@ public class ChannelLogSubject extends AbstractLogSubject */ setLogStringWithFormat(CHANNEL_FORMAT, session.getSessionID(), - session.getAuthorizedPrincipal().getName(), + session.getPrincipal().getName(), session.getRemoteAddress(), session.getVirtualHost().getName(), channel.getChannelId()); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubject.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubject.java index c1c836f9b4..a697029d24 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubject.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubject.java @@ -56,7 +56,7 @@ public class ConnectionLogSubject extends AbstractLogSubject { if (!_upToDate) { - if (_session.getAuthorizedPrincipal() != null) + if (_session.getPrincipal() != null) { if (_session.getVirtualHost() != null) { @@ -72,7 +72,7 @@ public class ConnectionLogSubject extends AbstractLogSubject */ _logString = "[" + MessageFormat.format(CONNECTION_FORMAT, _session.getSessionID(), - _session.getAuthorizedPrincipal().getName(), + _session.getPrincipal().getName(), _session.getRemoteAddress(), _session.getVirtualHost().getName()) + "] "; @@ -83,7 +83,7 @@ public class ConnectionLogSubject extends AbstractLogSubject { _logString = "[" + MessageFormat.format(USER_FORMAT, _session.getSessionID(), - _session.getAuthorizedPrincipal().getName(), + _session.getPrincipal().getName(), _session.getRemoteAddress()) + "] "; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/management/DefaultManagedObject.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/management/DefaultManagedObject.java index e44b8c41cb..0a739af695 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/management/DefaultManagedObject.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/management/DefaultManagedObject.java @@ -26,9 +26,8 @@ import javax.management.NotCompliantMBeanException; import javax.management.ObjectName; import javax.management.StandardMBean; -import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.registry.IApplicationRegistry; /** * Provides implementation of the boilerplate ManagedObject interface. Most managed objects should find it useful @@ -37,14 +36,10 @@ import org.apache.qpid.server.registry.IApplicationRegistry; */ public abstract class DefaultManagedObject extends StandardMBean implements ManagedObject { - private static final Logger LOGGER = Logger.getLogger(ApplicationRegistry.class); - private Class<?> _managementInterface; private String _typeName; - private ManagedObjectRegistry _registry; - protected DefaultManagedObject(Class<?> managementInterface, String typeName) throws NotCompliantMBeanException { @@ -70,26 +65,23 @@ public abstract class DefaultManagedObject extends StandardMBean implements Mana public void register() throws JMException { - _registry = ApplicationRegistry.getInstance().getManagedObjectRegistry(); - _registry.registerObject(this); + getManagedObjectRegistry().registerObject(this); } - public void unregister() + protected ManagedObjectRegistry getManagedObjectRegistry() + { + return ApplicationRegistry.getInstance().getManagedObjectRegistry(); + } + + public void unregister() throws AMQException { try { - if(_registry != null) - { - _registry.unregisterObject(this); - } + getManagedObjectRegistry().unregisterObject(this); } catch (JMException e) { - LOGGER.error("Error unregistering managed object: " + this + ": " + e, e); - } - finally - { - _registry = null; + throw new AMQException("Error unregistering managed object: " + this + ": " + e, e); } } @@ -161,4 +153,32 @@ public abstract class DefaultManagedObject extends StandardMBean implements Mana return ""; } + protected static StringBuffer jmxEncode(StringBuffer jmxName, int attrPos) + { + for (int i = attrPos; i < jmxName.length(); i++) + { + if (jmxName.charAt(i) == ',') + { + jmxName.setCharAt(i, ';'); + } + else if (jmxName.charAt(i) == ':') + { + jmxName.setCharAt(i, '-'); + } + else if (jmxName.charAt(i) == '?' || + jmxName.charAt(i) == '*' || + jmxName.charAt(i) == '\\') + { + jmxName.insert(i, '\\'); + i++; + } + else if (jmxName.charAt(i) == '\n') + { + jmxName.insert(i, '\\'); + i++; + jmxName.setCharAt(i, 'n'); + } + } + return jmxName; + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/management/JMXManagedObjectRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/management/JMXManagedObjectRegistry.java index 6a34ff4a26..0334a856c1 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/management/JMXManagedObjectRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/management/JMXManagedObjectRegistry.java @@ -20,6 +20,32 @@ */ package org.apache.qpid.server.management; +import org.apache.commons.configuration.ConfigurationException; +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.registry.IApplicationRegistry; +import org.apache.qpid.server.security.auth.database.PrincipalDatabase; +import org.apache.qpid.server.security.auth.rmi.RMIPasswordAuthenticator; +import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.messages.ManagementConsoleMessages; + +import javax.management.JMException; +import javax.management.MBeanServer; +import javax.management.MBeanServerFactory; +import javax.management.ObjectName; +import javax.management.NotificationListener; +import javax.management.NotificationFilterSupport; +import javax.management.remote.JMXConnectorServer; +import javax.management.remote.JMXServiceURL; +import javax.management.remote.MBeanServerForwarder; +import javax.management.remote.JMXConnectionNotification; +import javax.management.remote.rmi.RMIConnectorServer; +import javax.management.remote.rmi.RMIJRMPServerImpl; +import javax.management.remote.rmi.RMIServerImpl; +import javax.rmi.ssl.SslRMIClientSocketFactory; +import javax.rmi.ssl.SslRMIServerSocketFactory; + import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; @@ -38,31 +64,7 @@ import java.rmi.server.RMIClientSocketFactory; import java.rmi.server.RMIServerSocketFactory; import java.rmi.server.UnicastRemoteObject; import java.util.HashMap; - -import javax.management.JMException; -import javax.management.MBeanServer; -import javax.management.MBeanServerFactory; -import javax.management.NotificationFilterSupport; -import javax.management.NotificationListener; -import javax.management.ObjectName; -import javax.management.remote.JMXConnectionNotification; -import javax.management.remote.JMXConnectorServer; -import javax.management.remote.JMXServiceURL; -import javax.management.remote.MBeanServerForwarder; -import javax.management.remote.rmi.RMIConnectorServer; -import javax.management.remote.rmi.RMIJRMPServerImpl; -import javax.management.remote.rmi.RMIServerImpl; -import javax.rmi.ssl.SslRMIClientSocketFactory; -import javax.rmi.ssl.SslRMIServerSocketFactory; - -import org.apache.commons.configuration.ConfigurationException; -import org.apache.log4j.Logger; -import org.apache.qpid.AMQException; -import org.apache.qpid.server.logging.actors.CurrentActor; -import org.apache.qpid.server.logging.messages.ManagementConsoleMessages; -import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.registry.IApplicationRegistry; -import org.apache.qpid.server.security.auth.rmi.RMIPasswordAuthenticator; +import java.util.Map; /** * This class starts up an MBeanserver. If out of the box agent has been enabled then there are no @@ -111,6 +113,12 @@ public class JMXManagedObjectRegistry implements ManagedObjectRegistry IApplicationRegistry appRegistry = ApplicationRegistry.getInstance(); int port = appRegistry.getConfiguration().getJMXManagementPort(); + //retrieve the Principal Database assigned to JMX authentication duties + String jmxDatabaseName = appRegistry.getConfiguration().getJMXPrincipalDatabase(); + Map<String, PrincipalDatabase> map = appRegistry.getDatabaseManager().getDatabases(); + PrincipalDatabase db = map.get(jmxDatabaseName); + + HashMap<String,Object> env = new HashMap<String,Object>(); //Socket factories for the RMIConnectorServer, either default or SLL depending on configuration RMIClientSocketFactory csf; @@ -192,8 +200,7 @@ public class JMXManagedObjectRegistry implements ManagedObjectRegistry //add a JMXAuthenticator implementation the env map to authenticate the RMI based JMX connector server RMIPasswordAuthenticator rmipa = new RMIPasswordAuthenticator(); - rmipa.setAuthenticationManager(appRegistry.getAuthenticationManager()); - HashMap<String,Object> env = new HashMap<String,Object>(); + rmipa.setPrincipalDatabase(db); env.put(JMXConnectorServer.AUTHENTICATOR, rmipa); /* diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/management/MBeanInvocationHandlerImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/management/MBeanInvocationHandlerImpl.java index 68f7689283..964b5ed5a0 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/management/MBeanInvocationHandlerImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/management/MBeanInvocationHandlerImpl.java @@ -26,6 +26,8 @@ import java.lang.reflect.Method; import java.lang.reflect.Proxy; import java.security.AccessControlContext; import java.security.AccessController; +import java.security.Principal; +import java.util.Properties; import java.util.Set; import javax.management.Attribute; @@ -42,6 +44,7 @@ import javax.management.remote.MBeanServerForwarder; import javax.security.auth.Subject; import org.apache.log4j.Logger; +import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.actors.ManagementActor; import org.apache.qpid.server.logging.messages.ManagementConsoleMessages; import org.apache.qpid.server.registry.ApplicationRegistry; @@ -49,15 +52,20 @@ import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.security.access.Operation; /** - * This class can be used by the JMXConnectorServer as an InvocationHandler for the mbean operations. It delegates - * JMX access decisions to the SecurityPlugin. + * This class can be used by the JMXConnectorServer as an InvocationHandler for the mbean operations. This implements + * the logic for allowing the users to invoke MBean operations and implements the restrictions for readOnly, readWrite + * and admin users. */ public class MBeanInvocationHandlerImpl implements InvocationHandler, NotificationListener { private static final Logger _logger = Logger.getLogger(MBeanInvocationHandlerImpl.class); + public final static String ADMIN = "admin"; + public final static String READWRITE = "readwrite"; + public final static String READONLY = "readonly"; private final static String DELEGATE = "JMImplementation:type=MBeanServerDelegate"; private MBeanServer _mbs; + private static Properties _userRoles = new Properties(); private static ManagementActor _logActor; public static MBeanServerForwarder newProxyInstance() @@ -129,13 +137,14 @@ public class MBeanInvocationHandlerImpl implements InvocationHandler, Notificati Set<JMXPrincipal> principals = subject.getPrincipals(JMXPrincipal.class); if (principals == null || principals.isEmpty()) { - throw new SecurityException("Access denied: no JMX principal"); + throw new SecurityException("Access denied: no principal"); } - - // Save the subject - SecurityManager.setThreadSubject(subject); - - // Get the component, type and impact, which may be null + + // Save the principal + Principal principal = principals.iterator().next(); + SecurityManager.setThreadPrincipal(principal); + + // Get the component, type and impact, which may be null String type = getType(method, args); String vhost = getVirtualHost(method, args); int impact = getImpact(method, args); @@ -204,20 +213,6 @@ public class MBeanInvocationHandlerImpl implements InvocationHandler, Notificati ObjectName object = (ObjectName) args[0]; String vhost = object.getKeyProperty("VirtualHost"); - if(vhost != null) - { - try - { - //if the name is quoted in the ObjectName, unquote it - vhost = ObjectName.unquote(vhost); - } - catch(IllegalArgumentException e) - { - //ignore, this just means the name is not quoted - //and can be left unchanged - } - } - return vhost; } return null; @@ -277,7 +272,7 @@ public class MBeanInvocationHandlerImpl implements InvocationHandler, Notificati } catch (JMException ex) { - _logger.error("Unable to determine mbean impact for method : " + mbeanMethod, ex); + ex.printStackTrace(); } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/ContentHeaderBodyAdapter.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/ContentHeaderBodyAdapter.java index 84a1642578..194835ac02 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/ContentHeaderBodyAdapter.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/ContentHeaderBodyAdapter.java @@ -37,7 +37,7 @@ public class ContentHeaderBodyAdapter implements AMQMessageHeader private BasicContentHeaderProperties getProperties() { - return (BasicContentHeaderProperties) _contentHeaderBody.getProperties(); + return (BasicContentHeaderProperties) _contentHeaderBody.properties; } public String getCorrelationId() diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData.java index 66cb7ed83b..30bea7b6e6 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData.java @@ -161,7 +161,7 @@ public class MessageMetaData implements StorableMessageMetaData public boolean isPersistent() { - BasicContentHeaderProperties properties = (BasicContentHeaderProperties) (_contentHeaderBody.getProperties()); + BasicContentHeaderProperties properties = (BasicContentHeaderProperties) (_contentHeaderBody.properties); return properties.getDeliveryMode() == BasicContentHeaderProperties.PERSISTENT; } @@ -229,7 +229,7 @@ public class MessageMetaData implements StorableMessageMetaData { private BasicContentHeaderProperties getProperties() { - return (BasicContentHeaderProperties) getContentHeaderBody().getProperties(); + return (BasicContentHeaderProperties) getContentHeaderBody().properties; } public String getCorrelationId() diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/plugins/Plugin.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/plugins/Plugin.java index 804a9d5027..e7f9983fff 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/plugins/Plugin.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/plugins/Plugin.java @@ -27,5 +27,5 @@ public interface Plugin /** * Provide Configuration to this plugin */ - public void configure(ConfigurationPlugin config) throws ConfigurationException; + public void configure(ConfigurationPlugin config); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/plugins/PluginManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/plugins/PluginManager.java index c8a7b56ccb..a6bab017a1 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/plugins/PluginManager.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/plugins/PluginManager.java @@ -18,16 +18,8 @@ */ package org.apache.qpid.server.plugins; -import static org.apache.felix.framework.util.FelixConstants.SYSTEMBUNDLE_ACTIVATORS_PROP; -import static org.apache.felix.main.AutoProcessor.AUTO_DEPLOY_ACTION_PROPERY; -import static org.apache.felix.main.AutoProcessor.AUTO_DEPLOY_DIR_PROPERY; -import static org.apache.felix.main.AutoProcessor.AUTO_DEPLOY_INSTALL_VALUE; -import static org.apache.felix.main.AutoProcessor.AUTO_DEPLOY_START_VALUE; -import static org.apache.felix.main.AutoProcessor.process; -import static org.osgi.framework.Constants.FRAMEWORK_STORAGE; -import static org.osgi.framework.Constants.FRAMEWORK_STORAGE_CLEAN; -import static org.osgi.framework.Constants.FRAMEWORK_STORAGE_CLEAN_ONFIRSTINIT; -import static org.osgi.framework.Constants.FRAMEWORK_SYSTEMPACKAGES; +import static org.apache.felix.framework.util.FelixConstants.*; +import static org.apache.felix.main.AutoProcessor.*; import java.io.File; import java.util.ArrayList; @@ -43,20 +35,18 @@ import org.apache.felix.framework.util.StringMap; import org.apache.log4j.Logger; import org.apache.qpid.common.Closeable; import org.apache.qpid.server.configuration.TopicConfiguration; -import org.apache.qpid.server.configuration.plugins.ConfigurationPluginFactory; import org.apache.qpid.server.configuration.plugins.SlowConsumerDetectionConfiguration.SlowConsumerDetectionConfigurationFactory; import org.apache.qpid.server.configuration.plugins.SlowConsumerDetectionPolicyConfiguration.SlowConsumerDetectionPolicyConfigurationFactory; import org.apache.qpid.server.configuration.plugins.SlowConsumerDetectionQueueConfiguration.SlowConsumerDetectionQueueConfigurationFactory; +import org.apache.qpid.server.configuration.plugins.ConfigurationPluginFactory; import org.apache.qpid.server.exchange.ExchangeType; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.security.SecurityPluginFactory; import org.apache.qpid.server.security.access.plugins.AllowAll; import org.apache.qpid.server.security.access.plugins.DenyAll; import org.apache.qpid.server.security.access.plugins.LegacyAccess; -import org.apache.qpid.server.security.auth.manager.AuthenticationManagerPluginFactory; -import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager; -import org.apache.qpid.server.virtualhost.plugins.SlowConsumerDetection; import org.apache.qpid.server.virtualhost.plugins.VirtualHostPluginFactory; +import org.apache.qpid.server.virtualhost.plugins.SlowConsumerDetection; import org.apache.qpid.server.virtualhost.plugins.policies.TopicDeletePolicy; import org.apache.qpid.slowconsumerdetection.policies.SlowConsumerPolicyPluginFactory; import org.osgi.framework.BundleActivator; @@ -73,7 +63,7 @@ public class PluginManager implements Closeable private static final Logger _logger = Logger.getLogger(PluginManager.class); private static final int FELIX_STOP_TIMEOUT = 30000; - private static final String QPID_VER_SUFFIX = "version=0.13,"; + private static final String QPID_VER_SUFFIX = "version=0.9,"; private Framework _felix; @@ -82,7 +72,6 @@ public class PluginManager implements Closeable private ServiceTracker _configTracker = null; private ServiceTracker _virtualHostTracker = null; private ServiceTracker _policyTracker = null; - private ServiceTracker _authenticationManagerTracker = null; private Activator _activator; @@ -90,7 +79,6 @@ public class PluginManager implements Closeable private Map<List<String>, ConfigurationPluginFactory> _configPlugins = new IdentityHashMap<List<String>, ConfigurationPluginFactory>(); private Map<String, VirtualHostPluginFactory> _vhostPlugins = new HashMap<String, VirtualHostPluginFactory>(); private Map<String, SlowConsumerPolicyPluginFactory> _policyPlugins = new HashMap<String, SlowConsumerPolicyPluginFactory>(); - private Map<String, AuthenticationManagerPluginFactory<? extends Plugin>> _authenticationManagerPlugins = new HashMap<String, AuthenticationManagerPluginFactory<? extends Plugin>>(); public PluginManager(String pluginPath, String cachePath) throws Exception { @@ -109,8 +97,7 @@ public class PluginManager implements Closeable LegacyAccess.LegacyAccessConfiguration.FACTORY, new SlowConsumerDetectionConfigurationFactory(), new SlowConsumerDetectionPolicyConfigurationFactory(), - new SlowConsumerDetectionQueueConfigurationFactory(), - PrincipalDatabaseAuthenticationManager.PrincipalDatabaseAuthenticationManagerConfiguration.FACTORY)) + new SlowConsumerDetectionQueueConfigurationFactory())) { _configPlugins.put(configFactory.getParentPaths(), configFactory); } @@ -125,12 +112,6 @@ public class PluginManager implements Closeable _vhostPlugins.put(pluginFactory.getClass().getName(), pluginFactory); } - for (AuthenticationManagerPluginFactory<? extends Plugin> pluginFactory : Arrays.asList( - PrincipalDatabaseAuthenticationManager.FACTORY)) - { - _authenticationManagerPlugins.put(pluginFactory.getPluginName(), pluginFactory); - } - // Check the plugin directory path is set and exist if (pluginPath == null) { @@ -186,8 +167,7 @@ public class PluginManager implements Closeable "org.apache.commons.logging; version=1.0.0," + "org.apache.log4j; version=1.2.12," + "javax.management.openmbean; version=1.0.0," + - "javax.management; version=1.0.0," + - "javax.security.auth; version=1.0.0" + "javax.management; version=1.0.0" ); // No automatic shutdown hook @@ -251,9 +231,6 @@ public class PluginManager implements Closeable _policyTracker = new ServiceTracker(_activator.getContext(), SlowConsumerPolicyPluginFactory.class.getName(), null); _policyTracker.open(); - _authenticationManagerTracker = new ServiceTracker(_activator.getContext(), AuthenticationManagerPluginFactory.class.getName(), null); - _authenticationManagerTracker.open(); - _logger.info("Opened service trackers"); } @@ -324,11 +301,6 @@ public class PluginManager implements Closeable return getServices(_securityTracker, _securityPlugins); } - public Map<String, AuthenticationManagerPluginFactory<? extends Plugin>> getAuthenticationManagerPlugins() - { - return getServices(_authenticationManagerTracker, _authenticationManagerPlugins); - } - public void close() { if (_felix != null) @@ -341,7 +313,6 @@ public class PluginManager implements Closeable _configTracker.close(); _virtualHostTracker.close(); _policyTracker.close(); - _authenticationManagerTracker.close(); } finally { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java index 061ebf50cd..bcda385f64 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java @@ -20,35 +20,14 @@ */ package org.apache.qpid.server.protocol; -import java.util.List; -import java.util.UUID; - -import org.apache.qpid.AMQException; import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.server.logging.LogSubject; -import org.apache.qpid.server.stats.StatisticsGatherer; +import org.apache.qpid.AMQException; -public interface AMQConnectionModel extends StatisticsGatherer +public interface AMQConnectionModel { - /** - * get a unique id for this connection. - * - * @return a {@link UUID} representing the connection - */ - public UUID getId(); - - /** - * Close the underlying Connection - * - * @param cause - * @param message - * @throws org.apache.qpid.AMQException - */ - public void close(AMQConstant cause, String message) throws AMQException; /** * Close the given requested Session - * * @param session * @param cause * @param message @@ -57,16 +36,4 @@ public interface AMQConnectionModel extends StatisticsGatherer public void closeSession(AMQSessionModel session, AMQConstant cause, String message) throws AMQException; public long getConnectionId(); - - /** - * Get a list of all sessions using this connection. - * - * @return a list of {@link AMQSessionModel}s - */ - public List<AMQSessionModel> getSessionModels(); - - /** - * Return a {@link LogSubject} for the connection. - */ - public LogSubject getLogSubject(); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java index d6201f7cf6..a1ffe272fd 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java @@ -30,6 +30,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArraySet; @@ -37,10 +38,10 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import javax.management.JMException; -import javax.security.auth.Subject; import javax.security.sasl.SaslServer; import org.apache.log4j.Logger; +import org.apache.mina.transport.vmpipe.VmPipeAddress; import org.apache.qpid.AMQChannelException; import org.apache.qpid.AMQConnectionException; import org.apache.qpid.AMQException; @@ -89,14 +90,12 @@ import org.apache.qpid.server.management.ManagedObject; import org.apache.qpid.server.output.ProtocolOutputConverter; import org.apache.qpid.server.output.ProtocolOutputConverterRegistry; import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal; import org.apache.qpid.server.state.AMQState; import org.apache.qpid.server.state.AMQStateManager; -import org.apache.qpid.server.stats.StatisticsCounter; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; +import org.apache.qpid.transport.NetworkDriver; import org.apache.qpid.transport.Sender; -import org.apache.qpid.transport.network.NetworkConnection; public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocolSession, ConnectionConfig { @@ -147,7 +146,7 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol private Map<Integer, Long> _closingChannelsList = new ConcurrentHashMap<Integer, Long>(); private ProtocolOutputConverter _protocolOutputConverter; - private Subject _authorizedSubject; + private Principal _authorizedID; private MethodDispatcher _dispatcher; private ProtocolSessionIdentifier _sessionIdentifier; @@ -157,6 +156,8 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol private AMQPConnectionActor _actor; private LogSubject _logSubject; + private NetworkDriver _networkDriver; + private long _lastIoTime; private long _writtenBytes; @@ -171,28 +172,21 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol private final UUID _id; private final ConfigStore _configStore; private long _createTime = System.currentTimeMillis(); - - private ApplicationRegistry _registry; - private boolean _statisticsEnabled = false; - private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived; - - private final NetworkConnection _network; - private final Sender<ByteBuffer> _sender; public ManagedObject getManagedObject() { return _managedObject; } - public AMQProtocolEngine(VirtualHostRegistry virtualHostRegistry, NetworkConnection network) + public AMQProtocolEngine(VirtualHostRegistry virtualHostRegistry, NetworkDriver driver) { _stateManager = new AMQStateManager(virtualHostRegistry, this); + _networkDriver = driver; + _codecFactory = new AMQCodecFactory(true, this); _poolReference.acquireExecutorService(); _readJob = new Job(_poolReference, Job.MAX_JOB_EVENTS, true); _writeJob = new Job(_poolReference, Job.MAX_JOB_EVENTS, false); - _network = network; - _sender = _network.getSender(); _actor = new AMQPConnectionActor(this, virtualHostRegistry.getApplicationRegistry().getRootMessageLogger()); @@ -201,10 +195,9 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol _configStore = virtualHostRegistry.getConfigStore(); _id = _configStore.createId(); + _actor.message(ConnectionMessages.OPEN(null, null, false, false)); - _registry = virtualHostRegistry.getApplicationRegistry(); - initialiseStatistics(); } private AMQProtocolSessionMBean createMBean() throws JMException @@ -370,14 +363,14 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol null, mechanisms.getBytes(), locales.getBytes()); - _sender.send(responseBody.generateFrame(0).toNioByteBuffer()); + _networkDriver.send(responseBody.generateFrame(0).toNioByteBuffer()); } catch (AMQException e) { _logger.info("Received unsupported protocol initiation for protocol version: " + getProtocolVersion()); - _sender.send(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion()).toNioByteBuffer()); + _networkDriver.send(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion()).toNioByteBuffer()); } } @@ -498,7 +491,7 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol { public void run() { - _sender.send(buf); + _networkDriver.send(buf); } }); } @@ -690,8 +683,8 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol { if (delay > 0) { - _network.setMaxWriteIdle(delay); - _network.setMaxReadIdle((int) (ApplicationRegistry.getInstance().getConfiguration().getHeartBeatTimeout() * delay)); + _networkDriver.setMaxWriteIdle(delay); + _networkDriver.setMaxReadIdle((int) (ApplicationRegistry.getInstance().getConfiguration().getHeartBeatTimeout() * delay)); } } @@ -795,7 +788,7 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol public void closeProtocolSession() { - _sender.close(); + _networkDriver.close(); try { _stateManager.changeState(AMQState.CONNECTION_CLOSED); @@ -808,7 +801,7 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol public String toString() { - return getRemoteAddress() + "(" + (getAuthorizedPrincipal() == null ? "?" : getAuthorizedPrincipal().getName() + ")"); + return getRemoteAddress() + "(" + (getAuthorizedID() == null ? "?" : getAuthorizedID().getName() + ")"); } public String dump() @@ -830,11 +823,17 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol */ public String getLocalFQDN() { - SocketAddress address = _network.getLocalAddress(); + SocketAddress address = _networkDriver.getLocalAddress(); + // we use the vmpipe address in some tests hence the need for this rather ugly test. The host + // information is used by SASL primary. if (address instanceof InetSocketAddress) { return ((InetSocketAddress) address).getHostName(); } + else if (address instanceof VmPipeAddress) + { + return "vmpipe:" + ((VmPipeAddress) address).getPort(); + } else { throw new IllegalArgumentException("Unsupported socket address class: " + address); @@ -913,7 +912,7 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol public Object getClientIdentifier() { - return _network.getRemoteAddress(); + return (_networkDriver != null) ? _networkDriver.getRemoteAddress() : null; } public VirtualHost getVirtualHost() @@ -955,33 +954,29 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol return _protocolOutputConverter; } - public void setAuthorizedSubject(final Subject authorizedSubject) + public void setAuthorizedID(Principal authorizedID) { - if (authorizedSubject == null) - { - throw new IllegalArgumentException("authorizedSubject cannot be null"); - } - _authorizedSubject = authorizedSubject; + _authorizedID = authorizedID; } - - public Subject getAuthorizedSubject() + + public Principal getAuthorizedID() { - return _authorizedSubject; + return _authorizedID; } - - public Principal getAuthorizedPrincipal() + + public Principal getPrincipal() { - return _authorizedSubject == null ? null : UsernamePrincipal.getUsernamePrincipalFromSubject(_authorizedSubject); + return _authorizedID; } public SocketAddress getRemoteAddress() { - return _network.getRemoteAddress(); + return _networkDriver.getRemoteAddress(); } public SocketAddress getLocalAddress() { - return _network.getLocalAddress(); + return _networkDriver.getLocalAddress(); } public MethodRegistry getMethodRegistry() @@ -1011,9 +1006,14 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol // Nothing } + public void setNetworkDriver(NetworkDriver driver) + { + _networkDriver = driver; + } + public void writerIdle() { - _sender.send(HeartbeatBody.FRAME.toNioByteBuffer()); + _networkDriver.send(HeartbeatBody.FRAME.toNioByteBuffer()); } public void exception(Throwable throwable) @@ -1021,7 +1021,7 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol if (throwable instanceof AMQProtocolHeaderException) { writeFrame(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion())); - _sender.close(); + _networkDriver.close(); _logger.error("Error in protocol initiation " + this + ":" + getRemoteAddress() + " :" + throwable.getMessage(), throwable); } @@ -1039,7 +1039,7 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol writeFrame(closeBody.generateFrame(0)); - _sender.close(); + _networkDriver.close(); } } @@ -1078,6 +1078,19 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol return (_clientVersion == null) ? null : _clientVersion.toString(); } + public void closeIfLingeringClosedChannels() + { + for (Entry<Integer, Long>id : _closingChannelsList.entrySet()) + { + if (id.getValue() + 30000 > System.currentTimeMillis()) + { + // We have a channel that we closed 30 seconds ago. Client's dead, kill the connection + _logger.error("Closing connection as channel was closed more than 30 seconds ago and no ChannelCloseOk has been processed"); + closeProtocolSession(); + } + } + } + public Boolean isIncoming() { return true; @@ -1095,7 +1108,7 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol public String getAuthId() { - return getAuthorizedPrincipal().getName(); + return getAuthorizedID().getName(); } public Integer getRemotePID() @@ -1250,6 +1263,7 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol public void closeSession(AMQSessionModel session, AMQConstant cause, String message) throws AMQException { + closeChannel((Integer)session.getID()); MethodRegistry methodRegistry = getMethodRegistry(); @@ -1260,97 +1274,5 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol 0,0); writeFrame(responseBody.generateFrame((Integer)session.getID())); - } - - public void close(AMQConstant cause, String message) throws AMQException - { - closeConnection(0, new AMQConnectionException(cause, message, 0, 0, - getProtocolOutputConverter().getProtocolMajorVersion(), - getProtocolOutputConverter().getProtocolMinorVersion(), - (Throwable) null), true); - } - - public List<AMQSessionModel> getSessionModels() - { - List<AMQSessionModel> sessions = new ArrayList<AMQSessionModel>(); - for (AMQChannel channel : getChannels()) - { - sessions.add((AMQSessionModel) channel); - } - return sessions; - } - - public LogSubject getLogSubject() - { - return _logSubject; - } - - public void registerMessageDelivered(long messageSize) - { - if (isStatisticsEnabled()) - { - _messagesDelivered.registerEvent(1L); - _dataDelivered.registerEvent(messageSize); - } - _virtualHost.registerMessageDelivered(messageSize); - } - - public void registerMessageReceived(long messageSize, long timestamp) - { - if (isStatisticsEnabled()) - { - _messagesReceived.registerEvent(1L, timestamp); - _dataReceived.registerEvent(messageSize, timestamp); - } - _virtualHost.registerMessageReceived(messageSize, timestamp); - } - - public StatisticsCounter getMessageReceiptStatistics() - { - return _messagesReceived; - } - - public StatisticsCounter getDataReceiptStatistics() - { - return _dataReceived; - } - - public StatisticsCounter getMessageDeliveryStatistics() - { - return _messagesDelivered; - } - - public StatisticsCounter getDataDeliveryStatistics() - { - return _dataDelivered; - } - - public void resetStatistics() - { - _messagesDelivered.reset(); - _dataDelivered.reset(); - _messagesReceived.reset(); - _dataReceived.reset(); - } - - public void initialiseStatistics() - { - setStatisticsEnabled(!StatisticsCounter.DISABLE_STATISTICS && - _registry.getConfiguration().isStatisticsGenerationConnectionsEnabled()); - - _messagesDelivered = new StatisticsCounter("messages-delivered-" + getSessionID()); - _dataDelivered = new StatisticsCounter("data-delivered-" + getSessionID()); - _messagesReceived = new StatisticsCounter("messages-received-" + getSessionID()); - _dataReceived = new StatisticsCounter("data-received-" + getSessionID()); - } - - public boolean isStatisticsEnabled() - { - return _statisticsEnabled; - } - - public void setStatisticsEnabled(boolean enabled) - { - _statisticsEnabled = enabled; - } + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngineFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngineFactory.java index 94870c98bd..0e4444725e 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngineFactory.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngineFactory.java @@ -25,7 +25,7 @@ import org.apache.qpid.protocol.ProtocolEngine; import org.apache.qpid.protocol.ProtocolEngineFactory; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; -import org.apache.qpid.transport.network.NetworkConnection; +import org.apache.qpid.transport.NetworkDriver; public class AMQProtocolEngineFactory implements ProtocolEngineFactory { @@ -38,12 +38,13 @@ public class AMQProtocolEngineFactory implements ProtocolEngineFactory public AMQProtocolEngineFactory(Integer port) { - _vhosts = ApplicationRegistry.getInstance().getVirtualHostRegistry(); + _vhosts = ApplicationRegistry.getInstance(port).getVirtualHostRegistry(); } - public ProtocolEngine newProtocolEngine(NetworkConnection network) + public ProtocolEngine newProtocolEngine(NetworkDriver networkDriver) { - return new AMQProtocolEngine(_vhosts, network); + return new AMQProtocolEngine(_vhosts, networkDriver); } + } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java index 9116bf2767..f48a214933 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java @@ -20,7 +20,6 @@ */ package org.apache.qpid.server.protocol; -import javax.security.auth.Subject; import javax.security.sasl.SaslServer; import org.apache.qpid.AMQException; @@ -29,15 +28,16 @@ import org.apache.qpid.framing.*; import org.apache.qpid.AMQConnectionException; import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; import org.apache.qpid.server.AMQChannel; -import org.apache.qpid.server.security.AuthorizationHolder; +import org.apache.qpid.server.security.PrincipalHolder; import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.output.ProtocolOutputConverter; import org.apache.qpid.server.virtualhost.VirtualHost; +import java.security.Principal; import java.util.List; -public interface AMQProtocolSession extends AMQVersionAwareProtocolSession, AuthorizationHolder, AMQConnectionModel +public interface AMQProtocolSession extends AMQVersionAwareProtocolSession, PrincipalHolder, AMQConnectionModel { long getSessionID(); @@ -205,7 +205,7 @@ public interface AMQProtocolSession extends AMQVersionAwareProtocolSession, Auth public ProtocolOutputConverter getProtocolOutputConverter(); - void setAuthorizedSubject(Subject authorizedSubject); + void setAuthorizedID(Principal authorizedID); public java.net.SocketAddress getRemoteAddress(); @@ -231,5 +231,7 @@ public interface AMQProtocolSession extends AMQVersionAwareProtocolSession, Auth List<AMQChannel> getChannels(); + void closeIfLingeringClosedChannels(); + void mgmtCloseChannel(int channelId); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java index 16d99de492..f4f2cab2c2 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java @@ -37,15 +37,25 @@ */ package org.apache.qpid.server.protocol; -import java.util.Date; -import java.util.List; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.ConnectionCloseBody; +import org.apache.qpid.framing.MethodRegistry; +import org.apache.qpid.management.common.mbeans.ManagedConnection; +import org.apache.qpid.management.common.mbeans.annotations.MBeanConstructor; +import org.apache.qpid.management.common.mbeans.annotations.MBeanDescription; +import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.actors.ManagementActor; +import org.apache.qpid.server.management.AMQManagedObject; +import org.apache.qpid.server.management.ManagedObject; import javax.management.JMException; import javax.management.MBeanException; import javax.management.MBeanNotificationInfo; import javax.management.NotCompliantMBeanException; import javax.management.Notification; -import javax.management.ObjectName; import javax.management.monitor.MonitorNotification; import javax.management.openmbean.CompositeData; import javax.management.openmbean.CompositeDataSupport; @@ -56,20 +66,8 @@ import javax.management.openmbean.SimpleType; import javax.management.openmbean.TabularData; import javax.management.openmbean.TabularDataSupport; import javax.management.openmbean.TabularType; - -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.ConnectionCloseBody; -import org.apache.qpid.framing.MethodRegistry; -import org.apache.qpid.management.common.mbeans.ManagedConnection; -import org.apache.qpid.management.common.mbeans.annotations.MBeanConstructor; -import org.apache.qpid.management.common.mbeans.annotations.MBeanDescription; -import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.server.AMQChannel; -import org.apache.qpid.server.logging.actors.CurrentActor; -import org.apache.qpid.server.logging.actors.ManagementActor; -import org.apache.qpid.server.management.AMQManagedObject; -import org.apache.qpid.server.management.ManagedObject; +import java.util.Date; +import java.util.List; /** * This MBean class implements the management interface. In order to make more attributes, operations and notifications @@ -96,7 +94,8 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed super(ManagedConnection.class, ManagedConnection.TYPE); _protocolSession = amqProtocolSession; String remote = getRemoteAddress(); - _name = "anonymous".equals(remote) ? (remote + hashCode()) : remote; + remote = "anonymous".equals(remote) ? (remote + hashCode()) : remote; + _name = jmxEncode(new StringBuffer(remote), 0).toString(); init(); } @@ -131,7 +130,7 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed public String getAuthorizedId() { - return (_protocolSession.getAuthorizedPrincipal() != null ) ? _protocolSession.getAuthorizedPrincipal().getName() : null; + return (_protocolSession.getPrincipal() != null ) ? _protocolSession.getPrincipal().getName() : null; } public String getVersion() @@ -176,7 +175,7 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed public String getObjectInstanceName() { - return ObjectName.quote(_name); + return _name; } /** @@ -340,78 +339,4 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed _broadcaster.sendNotification(n); } - public void resetStatistics() throws Exception - { - _protocolSession.resetStatistics(); - } - - public double getPeakMessageDeliveryRate() - { - return _protocolSession.getMessageDeliveryStatistics().getPeak(); - } - - public double getPeakDataDeliveryRate() - { - return _protocolSession.getDataDeliveryStatistics().getPeak(); - } - - public double getMessageDeliveryRate() - { - return _protocolSession.getMessageDeliveryStatistics().getRate(); - } - - public double getDataDeliveryRate() - { - return _protocolSession.getDataDeliveryStatistics().getRate(); - } - - public long getTotalMessagesDelivered() - { - return _protocolSession.getMessageDeliveryStatistics().getTotal(); - } - - public long getTotalDataDelivered() - { - return _protocolSession.getDataDeliveryStatistics().getTotal(); - } - - public double getPeakMessageReceiptRate() - { - return _protocolSession.getMessageReceiptStatistics().getPeak(); - } - - public double getPeakDataReceiptRate() - { - return _protocolSession.getDataReceiptStatistics().getPeak(); - } - - public double getMessageReceiptRate() - { - return _protocolSession.getMessageReceiptStatistics().getRate(); - } - - public double getDataReceiptRate() - { - return _protocolSession.getDataReceiptStatistics().getRate(); - } - - public long getTotalMessagesReceived() - { - return _protocolSession.getMessageReceiptStatistics().getTotal(); - } - - public long getTotalDataReceived() - { - return _protocolSession.getDataReceiptStatistics().getTotal(); - } - - public boolean isStatisticsEnabled() - { - return _protocolSession.isStatisticsEnabled(); - } - - public void setStatisticsEnabled(boolean enabled) - { - _protocolSession.setStatisticsEnabled(enabled); - } -} +} // End of MBean class diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java index bc63403a86..a9b2354d75 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java @@ -20,35 +20,15 @@ */ package org.apache.qpid.server.protocol; -import org.apache.qpid.AMQException; import org.apache.qpid.server.logging.LogSubject; public interface AMQSessionModel { - public Object getID(); + Object getID(); - public AMQConnectionModel getConnectionModel(); + AMQConnectionModel getConnectionModel(); - public String getClientID(); - - public void close() throws AMQException; + String getClientID(); - public LogSubject getLogSubject(); - - /** - * This method is called from the housekeeping thread to check the status of - * transactions on this session and react appropriately. - * - * If a transaction is open for too long or idle for too long then a warning - * is logged or the connection is closed, depending on the configuration. An open - * transaction is one that has recent activity. The transaction age is counted - * from the time the transaction was started. An idle transaction is one that - * has had no activity, such as publishing or acknowledgeing messages. - * - * @param openWarn time in milliseconds before alerting on open transaction - * @param openClose time in milliseconds before closing connection with open transaction - * @param idleWarn time in milliseconds before alerting on idle transaction - * @param idleClose time in milliseconds before closing connection with idle transaction - */ - public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose) throws AMQException; + LogSubject getLogSubject(); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java index 01b12b44ce..eb957ee33c 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java @@ -23,11 +23,11 @@ package org.apache.qpid.server.protocol; import org.apache.log4j.Logger; import org.apache.qpid.protocol.ProtocolEngine; +import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory.VERSION; import org.apache.qpid.server.registry.IApplicationRegistry; import org.apache.qpid.server.transport.ServerConnection; import org.apache.qpid.transport.ConnectionDelegate; -import org.apache.qpid.transport.Sender; -import org.apache.qpid.transport.network.NetworkConnection; +import org.apache.qpid.transport.NetworkDriver; import java.net.SocketAddress; import java.nio.ByteBuffer; @@ -37,24 +37,28 @@ public class MultiVersionProtocolEngine implements ProtocolEngine { private static final Logger _logger = Logger.getLogger(MultiVersionProtocolEngine.class); - private Set<AmqpProtocolVersion> _supported; + + + private NetworkDriver _networkDriver; + private Set<VERSION> _supported; private String _fqdn; private IApplicationRegistry _appRegistry; - private NetworkConnection _network; - private Sender<ByteBuffer> _sender; - + private volatile ProtocolEngine _delegate = new SelfDelegateProtocolEngine(); public MultiVersionProtocolEngine(IApplicationRegistry appRegistry, String fqdn, - Set<AmqpProtocolVersion> supported, - NetworkConnection network) + Set<VERSION> supported, NetworkDriver networkDriver) { _appRegistry = appRegistry; _fqdn = fqdn; _supported = supported; - _network = network; - _sender = _network.getSender(); + _networkDriver = networkDriver; + } + + public void setNetworkDriver(NetworkDriver driver) + { + _delegate.setNetworkDriver(driver); } public SocketAddress getRemoteAddress() @@ -151,7 +155,7 @@ private static final byte[] AMQP_0_9_1_HEADER = private static interface DelegateCreator { - AmqpProtocolVersion getVersion(); + VERSION getVersion(); byte[] getHeaderIdentifier(); ProtocolEngine getProtocolEngine(); } @@ -159,9 +163,9 @@ private static final byte[] AMQP_0_9_1_HEADER = private DelegateCreator creator_0_8 = new DelegateCreator() { - public AmqpProtocolVersion getVersion() + public VERSION getVersion() { - return AmqpProtocolVersion.v0_8; + return VERSION.v0_8; } public byte[] getHeaderIdentifier() @@ -171,16 +175,16 @@ private static final byte[] AMQP_0_9_1_HEADER = public ProtocolEngine getProtocolEngine() { - return new AMQProtocolEngine(_appRegistry.getVirtualHostRegistry(), _network); + return new AMQProtocolEngine(_appRegistry.getVirtualHostRegistry(), _networkDriver); } }; private DelegateCreator creator_0_9 = new DelegateCreator() { - public AmqpProtocolVersion getVersion() + public VERSION getVersion() { - return AmqpProtocolVersion.v0_9; + return VERSION.v0_9; } @@ -191,16 +195,16 @@ private static final byte[] AMQP_0_9_1_HEADER = public ProtocolEngine getProtocolEngine() { - return new AMQProtocolEngine(_appRegistry.getVirtualHostRegistry(), _network); + return new AMQProtocolEngine(_appRegistry.getVirtualHostRegistry(), _networkDriver); } }; private DelegateCreator creator_0_9_1 = new DelegateCreator() { - public AmqpProtocolVersion getVersion() + public VERSION getVersion() { - return AmqpProtocolVersion.v0_9_1; + return VERSION.v0_9_1; } @@ -211,7 +215,7 @@ private static final byte[] AMQP_0_9_1_HEADER = public ProtocolEngine getProtocolEngine() { - return new AMQProtocolEngine(_appRegistry.getVirtualHostRegistry(), _network); + return new AMQProtocolEngine(_appRegistry.getVirtualHostRegistry(), _networkDriver); } }; @@ -219,9 +223,9 @@ private static final byte[] AMQP_0_9_1_HEADER = private DelegateCreator creator_0_10 = new DelegateCreator() { - public AmqpProtocolVersion getVersion() + public VERSION getVersion() { - return AmqpProtocolVersion.v0_10; + return VERSION.v0_10; } @@ -238,7 +242,7 @@ private static final byte[] AMQP_0_9_1_HEADER = ServerConnection conn = new ServerConnection(); conn.setConnectionDelegate(connDelegate); - return new ProtocolEngine_0_10( conn, _network, _appRegistry); + return new ProtocolEngine_0_10( conn, _networkDriver, _appRegistry); } }; @@ -248,14 +252,19 @@ private static final byte[] AMQP_0_9_1_HEADER = private class ClosedDelegateProtocolEngine implements ProtocolEngine { + public void setNetworkDriver(NetworkDriver driver) + { + _networkDriver = driver; + } + public SocketAddress getRemoteAddress() { - return _network.getRemoteAddress(); + return _networkDriver.getRemoteAddress(); } public SocketAddress getLocalAddress() { - return _network.getLocalAddress(); + return _networkDriver.getLocalAddress(); } public long getWrittenBytes() @@ -296,16 +305,22 @@ private static final byte[] AMQP_0_9_1_HEADER = private class SelfDelegateProtocolEngine implements ProtocolEngine { + private final ByteBuffer _header = ByteBuffer.allocate(MINIMUM_REQUIRED_HEADER_BYTES); + public void setNetworkDriver(NetworkDriver driver) + { + _networkDriver = driver; + } + public SocketAddress getRemoteAddress() { - return _network.getRemoteAddress(); + return _networkDriver.getRemoteAddress(); } public SocketAddress getLocalAddress() { - return _network.getLocalAddress(); + return _networkDriver.getLocalAddress(); } public long getWrittenBytes() @@ -365,12 +380,14 @@ private static final byte[] AMQP_0_9_1_HEADER = // If no delegate is found then send back the most recent support protocol version id if(newDelegate == null) { - _sender.send(ByteBuffer.wrap(newestSupported)); + _networkDriver.send(ByteBuffer.wrap(newestSupported)); _delegate = new ClosedDelegateProtocolEngine(); } else { + newDelegate.setNetworkDriver(_networkDriver); + _delegate = newDelegate; _header.flip(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java index 96d46353c6..75358c42d9 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java @@ -20,22 +20,28 @@ */ package org.apache.qpid.server.protocol; -import java.util.EnumSet; -import java.util.Set; - -import org.apache.qpid.protocol.ProtocolEngine; import org.apache.qpid.protocol.ProtocolEngineFactory; +import org.apache.qpid.protocol.ProtocolEngine; +import org.apache.qpid.transport.NetworkDriver; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.registry.IApplicationRegistry; -import org.apache.qpid.transport.network.NetworkConnection; + +import java.util.Set; +import java.util.Arrays; +import java.util.HashSet; public class MultiVersionProtocolEngineFactory implements ProtocolEngineFactory { - private static final Set<AmqpProtocolVersion> ALL_VERSIONS = EnumSet.allOf(AmqpProtocolVersion.class); + ; + + + public enum VERSION { v0_8, v0_9, v0_9_1, v0_10 }; + + private static final Set<VERSION> ALL_VERSIONS = new HashSet<VERSION>(Arrays.asList(VERSION.values())); private final IApplicationRegistry _appRegistry; private final String _fqdn; - private final Set<AmqpProtocolVersion> _supported; + private final Set<VERSION> _supported; public MultiVersionProtocolEngineFactory() @@ -43,7 +49,7 @@ public class MultiVersionProtocolEngineFactory implements ProtocolEngineFactory this(1, "localhost", ALL_VERSIONS); } - public MultiVersionProtocolEngineFactory(String fqdn, Set<AmqpProtocolVersion> versions) + public MultiVersionProtocolEngineFactory(String fqdn, Set<VERSION> versions) { this(1, fqdn, versions); } @@ -54,16 +60,16 @@ public class MultiVersionProtocolEngineFactory implements ProtocolEngineFactory this(1, fqdn, ALL_VERSIONS); } - public MultiVersionProtocolEngineFactory(int instance, String fqdn, Set<AmqpProtocolVersion> supportedVersions) + public MultiVersionProtocolEngineFactory(int instance, String fqdn, Set<VERSION> supportedVersions) { - _appRegistry = ApplicationRegistry.getInstance(); + _appRegistry = ApplicationRegistry.getInstance(instance); _fqdn = fqdn; _supported = supportedVersions; } - public ProtocolEngine newProtocolEngine(NetworkConnection network) + public ProtocolEngine newProtocolEngine(NetworkDriver networkDriver) { - return new MultiVersionProtocolEngine(_appRegistry, _fqdn, _supported, network); + return new MultiVersionProtocolEngine(_appRegistry, _fqdn, _supported, networkDriver); } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java index 8c62441d59..30d506a89b 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java @@ -21,12 +21,13 @@ package org.apache.qpid.server.protocol; import org.apache.qpid.protocol.ProtocolEngine; +import org.apache.qpid.transport.NetworkDriver; import org.apache.qpid.transport.network.InputHandler; import org.apache.qpid.transport.network.Assembler; import org.apache.qpid.transport.network.Disassembler; -import org.apache.qpid.transport.network.NetworkConnection; import org.apache.qpid.server.configuration.*; import org.apache.qpid.server.transport.ServerConnection; +import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.messages.ConnectionMessages; import org.apache.qpid.server.registry.IApplicationRegistry; @@ -37,7 +38,7 @@ public class ProtocolEngine_0_10 extends InputHandler implements ProtocolEngine { public static final int MAX_FRAME_SIZE = 64 * 1024 - 1; - private NetworkConnection _network; + private NetworkDriver _networkDriver; private long _readBytes; private long _writtenBytes; private ServerConnection _connection; @@ -46,17 +47,26 @@ public class ProtocolEngine_0_10 extends InputHandler implements ProtocolEngine private long _createTime = System.currentTimeMillis(); public ProtocolEngine_0_10(ServerConnection conn, - NetworkConnection network, + NetworkDriver networkDriver, final IApplicationRegistry appRegistry) { super(new Assembler(conn)); _connection = conn; _connection.setConnectionConfig(this); - _network = network; + _networkDriver = networkDriver; _id = appRegistry.getConfigStore().createId(); _appRegistry = appRegistry; - _connection.setSender(new Disassembler(_network.getSender(), MAX_FRAME_SIZE)); + // FIXME Two log messages to maintain compatinbility with earlier protocol versions + _connection.getLogActor().message(ConnectionMessages.OPEN(null, null, false, false)); + _connection.getLogActor().message(ConnectionMessages.OPEN(null, "0-10", false, true)); + } + + public void setNetworkDriver(NetworkDriver driver) + { + _networkDriver = driver; + Disassembler dis = new Disassembler(driver, MAX_FRAME_SIZE); + _connection.setSender(dis); _connection.onOpen(new Runnable() { public void run() @@ -65,19 +75,16 @@ public class ProtocolEngine_0_10 extends InputHandler implements ProtocolEngine } }); - // FIXME Two log messages to maintain compatibility with earlier protocol versions - _connection.getLogActor().message(ConnectionMessages.OPEN(null, null, false, false)); - _connection.getLogActor().message(ConnectionMessages.OPEN(null, "0-10", false, true)); } public SocketAddress getRemoteAddress() { - return _network.getRemoteAddress(); + return _networkDriver.getRemoteAddress(); } public SocketAddress getLocalAddress() { - return _network.getLocalAddress(); + return _networkDriver.getLocalAddress(); } public long getReadBytes() @@ -127,7 +134,7 @@ public class ProtocolEngine_0_10 extends InputHandler implements ProtocolEngine public String getAuthId() { - return _connection.getAuthorizedPrincipal() == null ? null : _connection.getAuthorizedPrincipal().getName(); + return _connection.getAuthorizationID(); } public String getRemoteProcessName() diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java index 371ae0de50..b6e97e08fb 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java @@ -60,7 +60,7 @@ public class AMQPriorityQueue extends SimpleAMQQueue { // check that all subscriptions are not in advance of the entry SubscriptionList.SubscriptionNodeIterator subIter = _subscriptionList.iterator(); - while(subIter.advance() && entry.isAvailable()) + while(subIter.advance() && !entry.isAcquired()) { final Subscription subscription = subIter.getNode().getSubscription(); if(!subscription.isClosed()) @@ -70,7 +70,7 @@ public class AMQPriorityQueue extends SimpleAMQQueue { QueueEntry subnode = context._lastSeenEntry; QueueEntry released = context._releasedEntry; - while(subnode != null && entry.compareTo(subnode) < 0 && entry.isAvailable() && (released == null || released.compareTo(entry) < 0)) + while(subnode != null && entry.compareTo(subnode) < 0 && !entry.isAcquired() && (released == null || released.compareTo(entry) < 0)) { if(QueueContext._releasedUpdater.compareAndSet(context,released,entry)) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index 9140a13625..de9dc42de8 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -21,18 +21,21 @@ package org.apache.qpid.server.queue; import org.apache.qpid.AMQException; +import org.apache.qpid.AMQSecurityException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin; import org.apache.qpid.server.logging.LogSubject; +import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.binding.Binding; import org.apache.qpid.server.configuration.QueueConfig; +import org.apache.qpid.server.configuration.QueueConfiguration; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.exchange.ExchangeReferrer; import org.apache.qpid.server.management.Managable; import org.apache.qpid.server.management.ManagedObject; -import org.apache.qpid.server.security.AuthorizationHolder; +import org.apache.qpid.server.security.PrincipalHolder; import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.txn.ServerTransaction; @@ -69,8 +72,8 @@ public interface AMQQueue extends Managable, Comparable<AMQQueue>, ExchangeRefer boolean isAutoDelete(); AMQShortString getOwner(); - AuthorizationHolder getAuthorizationHolder(); - void setAuthorizationHolder(AuthorizationHolder principalHolder); + PrincipalHolder getPrincipalHolder(); + void setPrincipalHolder(PrincipalHolder principalHolder); void setExclusiveOwningSession(AMQSessionModel owner); AMQSessionModel getExclusiveOwningSession(); @@ -105,16 +108,23 @@ public interface AMQQueue extends Managable, Comparable<AMQQueue>, ExchangeRefer boolean isDeleted(); + int delete() throws AMQException; + void requeue(QueueEntry entry); + void requeue(QueueEntryImpl storeContext, Subscription subscription); + void dequeue(QueueEntry entry, Subscription sub); void decrementUnackedMsgCount(); + boolean resend(final QueueEntry entry, final Subscription subscription) throws AMQException; + + void addQueueDeleteTask(final Task task); void removeQueueDeleteTask(final Task task); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java index c8eb118b11..b5294b6d2f 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java @@ -43,7 +43,6 @@ import javax.management.JMException; import javax.management.MBeanException; import javax.management.MBeanNotificationInfo; import javax.management.Notification; -import javax.management.ObjectName; import javax.management.OperationsException; import javax.management.monitor.MonitorNotification; import javax.management.openmbean.ArrayType; @@ -98,7 +97,7 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que { super(ManagedQueue.class, ManagedQueue.TYPE); _queue = queue; - _queueName = queue.getName(); + _queueName = jmxEncode(new StringBuffer(queue.getNameShortString()), 0).toString(); } public ManagedObject getParentObject() @@ -148,7 +147,7 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que public String getObjectInstanceName() { - return ObjectName.quote(_queueName); + return _queueName; } public String getName() @@ -507,7 +506,7 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que private String[] getMessageHeaderProperties(ContentHeaderBody headerBody) { List<String> list = new ArrayList<String>(); - BasicContentHeaderProperties headerProperties = (BasicContentHeaderProperties) headerBody.getProperties(); + BasicContentHeaderProperties headerProperties = (BasicContentHeaderProperties) headerBody.properties; list.add("reply-to = " + headerProperties.getReplyToAsString()); list.add("propertyFlags = " + headerProperties.getPropertyFlags()); list.add("ApplicationID = " + headerProperties.getAppIdAsString()); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java index 3e3288404f..2d2fb3a214 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java @@ -96,9 +96,9 @@ public class IncomingMessage implements Filterable, InboundMessage, EnqueableMes public void setExpiration() { long expiration = - ((BasicContentHeaderProperties) _contentHeaderBody.getProperties()).getExpiration(); + ((BasicContentHeaderProperties) _contentHeaderBody.properties).getExpiration(); long timestamp = - ((BasicContentHeaderProperties) _contentHeaderBody.getProperties()).getTimestamp(); + ((BasicContentHeaderProperties) _contentHeaderBody.properties).getTimestamp(); if (SYNCHED_CLOCKS) { @@ -193,8 +193,8 @@ public class IncomingMessage implements Filterable, InboundMessage, EnqueableMes public boolean isPersistent() { - return getContentHeader().getProperties() instanceof BasicContentHeaderProperties && - ((BasicContentHeaderProperties) getContentHeader().getProperties()).getDeliveryMode() == + return getContentHeader().properties instanceof BasicContentHeaderProperties && + ((BasicContentHeaderProperties) getContentHeader().properties).getDeliveryMode() == BasicContentHeaderProperties.PERSISTENT; } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java index 88349586c3..edd1e0bdc3 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java @@ -52,17 +52,6 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable } public abstract State getState(); - - /** - * Returns true if state is either DEQUEUED or DELETED. - * - * @return true if state is either DEQUEUED or DELETED. - */ - public boolean isDispensed() - { - State currentState = getState(); - return currentState == State.DEQUEUED || currentState == State.DELETED; - } } @@ -206,6 +195,8 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable boolean isRejectedBy(Subscription subscription); + void requeue(Subscription subscription); + void dequeue(); void dispose(); @@ -218,18 +209,4 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable void addStateChangeListener(StateChangeListener listener); boolean removeStateChangeListener(StateChangeListener listener); - - /** - * Returns true if entry is in DEQUEUED state, otherwise returns false. - * - * @return true if entry is in DEQUEUED state, otherwise returns false - */ - boolean isDequeued(); - - /** - * Returns true if entry is either DEQUED or DELETED state. - * - * @return true if entry is either DEQUED or DELETED state - */ - boolean isDispensed(); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java index e951b3dfd4..1ba4f4d89b 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java @@ -358,6 +358,15 @@ public class QueueEntryImpl implements QueueEntry } } + public void requeue(Subscription subscription) + { + getQueue().requeue(this, subscription); + if(_stateChangeListeners != null) + { + notifyStateChange(QueueEntry.State.ACQUIRED, QueueEntry.State.AVAILABLE); + } + } + public void dequeue() { EntryState state = _state; @@ -499,7 +508,7 @@ public class QueueEntryImpl implements QueueEntry { QueueEntryImpl next = nextNode(); - while(next != null && next.isDispensed() ) + while(next != null && next.isDeleted()) { final QueueEntryImpl newNext = next.nextNode(); @@ -547,14 +556,4 @@ public class QueueEntryImpl implements QueueEntry return _queueEntryList; } - public boolean isDequeued() - { - return _state == DEQUEUED_STATE; - } - - public boolean isDispensed() - { - return _state.isDispensed(); - } - } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java deleted file mode 100644 index 7e1d57e205..0000000000 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import org.apache.log4j.Logger; -import org.apache.qpid.AMQException; -import org.apache.qpid.pool.ReadWriteRunnable; -import org.apache.qpid.server.logging.actors.CurrentActor; -import org.apache.qpid.server.queue.QueueRunner; -import org.apache.qpid.server.queue.SimpleAMQQueue; - -/** - * QueueRunners are Runnables used to process a queue when requiring - * asynchronous message delivery to subscriptions, which is necessary - * when straight-through delivery of a message to a subscription isn't - * possible during the enqueue operation. - */ -public class QueueRunner implements ReadWriteRunnable -{ - private static final Logger _logger = Logger.getLogger(QueueRunner.class); - - private final String _name; - private final SimpleAMQQueue _queue; - - public QueueRunner(SimpleAMQQueue queue, long count) - { - _queue = queue; - _name = "QueueRunner-" + count + "-" + queue.getLogActor(); - } - - public void run() - { - String originalName = Thread.currentThread().getName(); - try - { - Thread.currentThread().setName(_name); - CurrentActor.set(_queue.getLogActor()); - - _queue.processQueue(this); - } - catch (AMQException e) - { - _logger.error("Exception during asynchronous delivery by " + _name, e); - } - finally - { - CurrentActor.remove(); - Thread.currentThread().setName(originalName); - } - } - - public boolean isRead() - { - return false; - } - - public boolean isWrite() - { - return true; - } - - public String toString() - { - return _name; - } -}
\ No newline at end of file diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index a095ef47ea..b003152db6 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -44,7 +44,7 @@ import org.apache.qpid.server.logging.subjects.QueueLogSubject; import org.apache.qpid.server.management.ManagedObject; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.security.AuthorizationHolder; +import org.apache.qpid.server.security.PrincipalHolder; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.subscription.SubscriptionList; import org.apache.qpid.server.txn.AutoCommitTransaction; @@ -83,7 +83,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener /** null means shared */ private final AMQShortString _owner; - private AuthorizationHolder _authorizationHolder; + private PrincipalHolder _prinicpalHolder; private boolean _exclusive = false; private AMQSessionModel _exclusiveOwner; @@ -102,7 +102,9 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener protected final QueueEntryList _entries; - protected final SubscriptionList _subscriptionList = new SubscriptionList(); + protected final SubscriptionList _subscriptionList = new SubscriptionList(this); + + private final AtomicReference<SubscriptionList.SubscriptionNode> _lastSubscriptionNode = new AtomicReference<SubscriptionList.SubscriptionNode>(_subscriptionList.getHead()); private volatile Subscription _exclusiveSubscriber; @@ -371,14 +373,14 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener return _owner; } - public AuthorizationHolder getAuthorizationHolder() + public PrincipalHolder getPrincipalHolder() { - return _authorizationHolder; + return _prinicpalHolder; } - public void setAuthorizationHolder(final AuthorizationHolder authorizationHolder) + public void setPrincipalHolder(PrincipalHolder prinicpalHolder) { - _authorizationHolder = authorizationHolder; + _prinicpalHolder = prinicpalHolder; } @@ -600,25 +602,25 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener iterate over subscriptions and if any is at the end of the queue and can deliver this message, then deliver the message */ - SubscriptionList.SubscriptionNode node = _subscriptionList.getMarkedNode(); - SubscriptionList.SubscriptionNode nextNode = node.findNext(); + SubscriptionList.SubscriptionNode node = _lastSubscriptionNode.get(); + SubscriptionList.SubscriptionNode nextNode = node.getNext(); if (nextNode == null) { - nextNode = _subscriptionList.getHead().findNext(); + nextNode = _subscriptionList.getHead().getNext(); } while (nextNode != null) { - if (_subscriptionList.updateMarkedNode(node, nextNode)) + if (_lastSubscriptionNode.compareAndSet(node, nextNode)) { break; } else { - node = _subscriptionList.getMarkedNode(); - nextNode = node.findNext(); + node = _lastSubscriptionNode.get(); + nextNode = node.getNext(); if (nextNode == null) { - nextNode = _subscriptionList.getHead().findNext(); + nextNode = _subscriptionList.getHead().getNext(); } } } @@ -627,7 +629,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener // this catches the case where we *just* miss an update int loops = 2; - while (entry.isAvailable() && loops != 0) + while (!(entry.isAcquired() || entry.isDeleted()) && loops != 0) { if (nextNode == null) { @@ -640,13 +642,13 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener Subscription sub = nextNode.getSubscription(); deliverToSubscription(sub, entry); } - nextNode = nextNode.findNext(); + nextNode = nextNode.getNext(); } } - if (entry.isAvailable()) + if (!(entry.isAcquired() || entry.isDeleted())) { checkSubscriptionsNotAheadOfDelivery(entry); @@ -803,6 +805,24 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } + public void requeue(QueueEntryImpl entry, Subscription subscription) + { + SubscriptionList.SubscriptionNodeIterator subscriberIter = _subscriptionList.iterator(); + // iterate over all the subscribers, and if they are in advance of this queue entry then move them backwards + while (subscriberIter.advance()) + { + Subscription sub = subscriberIter.getNode().getSubscription(); + + // we don't make browsers send the same stuff twice + if (sub.seesRequeues() && (!sub.acquires() && sub == subscription)) + { + updateSubRequeueEntry(sub, entry); + } + } + + deliverAsync(); + } + public void dequeue(QueueEntry entry, Subscription sub) { decrementQueueCount(); @@ -940,7 +960,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener while (queueListIterator.advance()) { QueueEntry node = queueListIterator.getNode(); - if (node != null && !node.isDispensed()) + if (node != null && !node.isDeleted()) { entryList.add(node); } @@ -1044,7 +1064,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener while (queueListIterator.advance() && !filter.filterComplete()) { QueueEntry node = queueListIterator.getNode(); - if (!node.isDispensed() && filter.accept(node)) + if (!node.isDeleted() && filter.accept(node)) { entryList.add(node); } @@ -1238,6 +1258,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener if ((messageId >= fromMessageId) && (messageId <= toMessageId) + && !node.isDeleted() && node.acquire()) { dequeueEntry(node); @@ -1267,7 +1288,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener while (noDeletes && queueListIterator.advance()) { QueueEntry node = queueListIterator.getNode(); - if (node.acquire()) + if (!node.isDeleted() && node.acquire()) { dequeueEntry(node); noDeletes = false; @@ -1297,7 +1318,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener while (queueListIterator.advance()) { QueueEntry node = queueListIterator.getNode(); - if (node.acquire()) + if (!node.isDeleted() && node.acquire()) { dequeueEntry(node, txn); if(++count == request) @@ -1564,7 +1585,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener public void deliverAsync() { - QueueRunner runner = new QueueRunner(this, _stateChangeCount.incrementAndGet()); + Runner runner = new Runner(_stateChangeCount.incrementAndGet()); if (_asynchronousRunner.compareAndSet(null, runner)) { @@ -1583,6 +1604,52 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener _asyncDelivery.execute(flusher); } + + private class Runner implements ReadWriteRunnable + { + String _name; + public Runner(long count) + { + _name = "QueueRunner-" + count + "-" + _logActor; + } + + public void run() + { + String originalName = Thread.currentThread().getName(); + try + { + Thread.currentThread().setName(_name); + CurrentActor.set(_logActor); + + processQueue(this); + } + catch (AMQException e) + { + _logger.error(e); + } + finally + { + CurrentActor.remove(); + Thread.currentThread().setName(originalName); + } + } + + public boolean isRead() + { + return false; + } + + public boolean isWrite() + { + return true; + } + + public String toString() + { + return _name; + } + } + public void flushSubscription(Subscription sub) throws AMQException { // Access control @@ -1651,7 +1718,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener QueueEntry node = getNextAvailableEntry(sub); - if (node != null && node.isAvailable()) + if (node != null && !(node.isAcquired() || node.isDeleted())) { if (sub.hasInterest(node)) { @@ -1712,7 +1779,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener QueueEntry node = (releasedNode != null && lastSeen.compareTo(releasedNode)>=0) ? releasedNode : _entries.next(lastSeen); boolean expired = false; - while (node != null && (!node.isAvailable() || (expired = node.expired()) || !sub.hasInterest(node))) + while (node != null && (node.isAcquired() || node.isDeleted() || (expired = node.expired()) || !sub.hasInterest(node))) { if (expired) { @@ -1741,40 +1808,14 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } - /** - * Used by queue Runners to asynchronously deliver messages to consumers. - * - * A queue Runner is started whenever a state change occurs, e.g when a new - * message arrives on the queue and cannot be immediately delivered to a - * subscription (i.e. asynchronous delivery is required). Unless there are - * SubFlushRunners operating (due to subscriptions unsuspending) which are - * capable of accepting/delivering all messages then these messages would - * otherwise remain on the queue. - * - * processQueue should be running while there are messages on the queue AND - * there are subscriptions that can deliver them. If there are no - * subscriptions capable of delivering the remaining messages on the queue - * then processQueue should stop to prevent spinning. - * - * Since processQueue is runs in a fixed size Executor, it should not run - * indefinitely to prevent starving other tasks of CPU (e.g jobs to process - * incoming messages may not be able to be scheduled in the thread pool - * because all threads are working on clearing down large queues). To solve - * this problem, after an arbitrary number of message deliveries the - * processQueue job stops iterating, resubmits itself to the executor, and - * ends the current instance - * - * @param runner the Runner to schedule - * @throws AMQException - */ - public void processQueue(QueueRunner runner) throws AMQException + private void processQueue(Runnable runner) throws AMQException { long stateChangeCount; long previousStateChangeCount = Long.MIN_VALUE; boolean deliveryIncomplete = true; - boolean lastLoop = false; - int iterations = MAX_ASYNC_DELIVERIES; + int extraLoops = 1; + long iterations = MAX_ASYNC_DELIVERIES; _asynchronousRunner.compareAndSet(runner, null); @@ -1791,14 +1832,12 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener if (previousStateChangeCount != stateChangeCount) { - //further asynchronous delivery is required since the - //previous loop. keep going if iteration slicing allows. - lastLoop = false; + extraLoops = 1; } previousStateChangeCount = stateChangeCount; - boolean allSubscriptionsDone = true; - boolean subscriptionDone; + deliveryIncomplete = _subscriptionList.size() != 0; + boolean done; SubscriptionList.SubscriptionNodeIterator subscriptionIter = _subscriptionList.iterator(); //iterate over the subscribers and try to advance their pointer @@ -1808,25 +1847,30 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener sub.getSendLock(); try { - //attempt delivery. returns true if no further delivery currently possible to this sub - subscriptionDone = attemptDelivery(sub); - if (subscriptionDone) + + done = attemptDelivery(sub); + + if (done) { - //close autoClose subscriptions if we are not currently intent on continuing - if (lastLoop && sub.isAutoClose()) + if (extraLoops == 0) { - unregisterSubscription(sub); + deliveryIncomplete = false; + if (sub.isAutoClose()) + { + unregisterSubscription(sub); - sub.confirmAutoClose(); + sub.confirmAutoClose(); + } + } + else + { + extraLoops--; } } else { - //this subscription can accept additional deliveries, so we must - //keep going after this (if iteration slicing allows it) - allSubscriptionsDone = false; - lastLoop = false; iterations--; + extraLoops = 1; } } finally @@ -1834,34 +1878,10 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener sub.releaseSendLock(); } } - - if(allSubscriptionsDone && lastLoop) - { - //We have done an extra loop already and there are again - //again no further delivery attempts possible, only - //keep going if state change demands it. - deliveryIncomplete = false; - } - else if(allSubscriptionsDone) - { - //All subscriptions reported being done, but we have to do - //an extra loop if the iterations are not exhausted and - //there is still any work to be done - deliveryIncomplete = _subscriptionList.size() != 0; - lastLoop = true; - } - else - { - //some subscriptions can still accept more messages, - //keep going if iteration count allows. - lastLoop = false; - deliveryIncomplete = true; - } - _asynchronousRunner.set(null); } - // If iterations == 0 then the limiting factor was the time-slicing rather than available messages or credit + // If deliveries == 0 then the limitting factor was the time-slicing rather than available messages or credit // therefore we should schedule this runner again (unless someone beats us to it :-) ). if (iterations == 0 && _asynchronousRunner.compareAndSet(null, runner)) { @@ -1881,8 +1901,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener while (queueListIterator.advance()) { QueueEntry node = queueListIterator.getNode(); - // Only process nodes that are not currently deleted and not dequeued - if (!node.isDispensed()) + // Only process nodes that are not currently deleted + if (!node.isDeleted()) { // If the node has exired then aquire it if (node.expired() && node.acquire()) @@ -2222,9 +2242,4 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } } } - - public LogActor getLogActor() - { - return _logActor; - } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java index 46baab8c85..b97c2c55c5 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java @@ -1,5 +1,6 @@ package org.apache.qpid.server.queue; +import org.apache.qpid.server.message.InboundMessage; import org.apache.qpid.server.message.ServerMessage; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; @@ -155,7 +156,7 @@ public class SimpleQueueEntryList implements QueueEntryList if(!atTail()) { QueueEntryImpl nextNode = _lastNode.nextNode(); - while(nextNode.isDispensed() && nextNode.nextNode() != null) + while(nextNode.isDeleted() && nextNode.nextNode() != null) { nextNode = nextNode.nextNode(); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java index 7c804fc1fd..78a642f22f 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java @@ -21,14 +21,9 @@ package org.apache.qpid.server.registry; import java.net.InetSocketAddress; -import java.util.Collection; import java.util.HashMap; -import java.util.Iterator; import java.util.Map; -import java.util.Timer; -import java.util.TimerTask; import java.util.UUID; -import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.configuration.ConfigurationException; import org.apache.log4j.Logger; @@ -46,27 +41,24 @@ import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.logging.CompositeStartupMessageLogger; import org.apache.qpid.server.logging.Log4jMessageLogger; import org.apache.qpid.server.logging.RootMessageLogger; +import org.apache.qpid.server.logging.AbstractRootMessageLogger; import org.apache.qpid.server.logging.SystemOutMessageLogger; -import org.apache.qpid.server.logging.actors.AbstractActor; import org.apache.qpid.server.logging.actors.BrokerActor; import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.messages.BrokerMessages; -import org.apache.qpid.server.logging.messages.VirtualHostMessages; import org.apache.qpid.server.management.ManagedObjectRegistry; import org.apache.qpid.server.management.NoopManagedObjectRegistry; -import org.apache.qpid.server.plugins.Plugin; import org.apache.qpid.server.plugins.PluginManager; import org.apache.qpid.server.security.SecurityManager; -import org.apache.qpid.server.security.SecurityManager.SecurityConfiguration; +import org.apache.qpid.server.security.auth.database.ConfigurationFilePrincipalDatabaseManager; +import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager; import org.apache.qpid.server.security.auth.manager.AuthenticationManager; -import org.apache.qpid.server.security.auth.manager.AuthenticationManagerPluginFactory; -import org.apache.qpid.server.stats.StatisticsCounter; +import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager; import org.apache.qpid.server.transport.QpidAcceptor; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.virtualhost.VirtualHostImpl; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; - /** * An abstract application registry that provides access to configuration information and handles the * construction and caching of configurable objects. @@ -77,10 +69,12 @@ public abstract class ApplicationRegistry implements IApplicationRegistry { protected static final Logger _logger = Logger.getLogger(ApplicationRegistry.class); - private static AtomicReference<IApplicationRegistry> _instance = new AtomicReference<IApplicationRegistry>(null); + private static Map<Integer, IApplicationRegistry> _instanceMap = new HashMap<Integer, IApplicationRegistry>(); protected final ServerConfiguration _configuration; + public static final int DEFAULT_INSTANCE = 1; + protected final Map<InetSocketAddress, QpidAcceptor> _acceptors = new HashMap<InetSocketAddress, QpidAcceptor>(); protected ManagedObjectRegistry _managedObjectRegistry; @@ -91,6 +85,8 @@ public abstract class ApplicationRegistry implements IApplicationRegistry protected SecurityManager _securityManager; + protected PrincipalDatabaseManager _databaseManager; + protected PluginManager _pluginManager; protected ConfigurationManager _configurationManager; @@ -106,10 +102,8 @@ public abstract class ApplicationRegistry implements IApplicationRegistry private BrokerConfig _broker; private ConfigStore _configStore; - - private Timer _reportingTimer; - private boolean _statisticsEnabled = false; - private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived; + + protected String _registryName; static { @@ -120,54 +114,53 @@ public abstract class ApplicationRegistry implements IApplicationRegistry { public void run() { - remove(); + removeAll(); } } public static void initialise(IApplicationRegistry instance) throws Exception { - if(instance == null) - { - throw new IllegalArgumentException("ApplicationRegistry instance must not be null"); - } + initialise(instance, DEFAULT_INSTANCE); + } - if(!_instance.compareAndSet(null, instance)) + @SuppressWarnings("finally") + public static void initialise(IApplicationRegistry instance, int instanceID) throws Exception + { + if (instance != null) { - throw new IllegalStateException("An ApplicationRegistry is already initialised"); - } - - _logger.info("Initialising Application Registry(" + instance + ")"); + _logger.info("Initialising Application Registry(" + instance + "):" + instanceID); + _instanceMap.put(instanceID, instance); + final ConfigStore store = ConfigStore.newInstance(); + store.setRoot(new SystemConfigImpl(store)); + instance.setConfigStore(store); - final ConfigStore store = ConfigStore.newInstance(); - store.setRoot(new SystemConfigImpl(store)); - instance.setConfigStore(store); + BrokerConfig broker = new BrokerConfigAdapter(instance); - BrokerConfig broker = new BrokerConfigAdapter(instance); + SystemConfig system = (SystemConfig) store.getRoot(); + system.addBroker(broker); + instance.setBroker(broker); - SystemConfig system = (SystemConfig) store.getRoot(); - system.addBroker(broker); - instance.setBroker(broker); - - try - { - instance.initialise(); - } - catch (Exception e) - { - _instance.set(null); - - //remove the Broker instance, then re-throw try { - system.removeBroker(broker); + instance.initialise(instanceID); } - catch(Throwable t) + catch (Exception e) { - //ignore + _instanceMap.remove(instanceID); + try + { + system.removeBroker(broker); + } + finally + { + throw e; + } } - - throw e; + } + else + { + remove(instanceID); } } @@ -183,19 +176,35 @@ public abstract class ApplicationRegistry implements IApplicationRegistry public static boolean isConfigured() { - return _instance.get() != null; + return isConfigured(DEFAULT_INSTANCE); + } + + public static boolean isConfigured(int instanceID) + { + return _instanceMap.containsKey(instanceID); } + /** Method to cleanly shutdown the default registry running in this JVM */ public static void remove() { - IApplicationRegistry instance = _instance.getAndSet(null); + remove(DEFAULT_INSTANCE); + } + + /** + * Method to cleanly shutdown specified registry running in this JVM + * + * @param instanceID the instance to shutdown + */ + public static void remove(int instanceID) + { try { + IApplicationRegistry instance = _instanceMap.get(instanceID); if (instance != null) { if (_logger.isInfoEnabled()) { - _logger.info("Shutting down ApplicationRegistry(" + instance + ")"); + _logger.info("Shutting down ApplicationRegistry(" + instanceID + "):" + instance); } instance.close(); instance.getBroker().getSystem().removeBroker(instance.getBroker()); @@ -203,7 +212,21 @@ public abstract class ApplicationRegistry implements IApplicationRegistry } catch (Exception e) { - _logger.error("Error shutting down Application Registry(" + instance + "): " + e, e); + _logger.error("Error shutting down Application Registry(" + instanceID + "): " + e, e); + } + finally + { + _instanceMap.remove(instanceID); + } + } + + /** Method to cleanly shutdown all registries currently running in this JVM */ + public static void removeAll() + { + Object[] keys = _instanceMap.keySet().toArray(); + for (Object k : keys) + { + remove((Integer) k); } } @@ -228,10 +251,11 @@ public abstract class ApplicationRegistry implements IApplicationRegistry _configuration.initialise(); } - public void initialise() throws Exception + public void initialise(int instanceID) throws Exception { //Create the RootLogger to be used during broker operation _rootMessageLogger = new Log4jMessageLogger(_configuration); + _registryName = String.valueOf(instanceID); //Create the composite (log4j+SystemOut MessageLogger to be used during startup RootMessageLogger[] messageLoggers = {new SystemOutMessageLogger(), _rootMessageLogger}; @@ -253,7 +277,11 @@ public abstract class ApplicationRegistry implements IApplicationRegistry _securityManager = new SecurityManager(_configuration, _pluginManager); - _authenticationManager = createAuthenticationManager(); + createDatabaseManager(_configuration); + + _authenticationManager = new PrincipalDatabaseAuthenticationManager(null, null); + + _databaseManager.initialiseManagement(_configuration); _managedObjectRegistry.start(); } @@ -266,8 +294,6 @@ public abstract class ApplicationRegistry implements IApplicationRegistry try { initialiseVirtualHosts(); - initialiseStatistics(); - initialiseStatisticsReporting(); } finally { @@ -276,51 +302,9 @@ public abstract class ApplicationRegistry implements IApplicationRegistry } } - /** - * Iterates across all discovered authentication manager factories, offering the security configuration to each. - * Expects <b>exactly</b> one authentication manager to configure and initialise itself. - * - * It is an error to configure more than one authentication manager, or to configure none. - * - * @return authentication manager - * @throws ConfigurationException - */ - protected AuthenticationManager createAuthenticationManager() throws ConfigurationException + protected void createDatabaseManager(ServerConfiguration configuration) throws Exception { - final SecurityConfiguration securityConfiguration = _configuration.getConfiguration(SecurityConfiguration.class.getName()); - final Collection<AuthenticationManagerPluginFactory<? extends Plugin>> factories = _pluginManager.getAuthenticationManagerPlugins().values(); - - if (factories.size() == 0) - { - throw new ConfigurationException("No authentication manager factory plugins found. Check the desired authentication" + - "manager plugin has been placed in the plugins directory."); - } - - AuthenticationManager authMgr = null; - - for (final Iterator<AuthenticationManagerPluginFactory<? extends Plugin>> iterator = factories.iterator(); iterator.hasNext();) - { - final AuthenticationManagerPluginFactory<? extends Plugin> factory = (AuthenticationManagerPluginFactory<? extends Plugin>) iterator.next(); - final AuthenticationManager tmp = factory.newInstance(securityConfiguration); - if (tmp != null) - { - if (authMgr != null) - { - throw new ConfigurationException("Cannot configure more than one authentication manager." - + " Both " + tmp.getClass() + " and " + authMgr.getClass() + " are configured." - + " Remove configuration for one of the authentication manager, or remove the plugin JAR" - + " from the classpath."); - } - authMgr = tmp; - } - } - - if (authMgr == null) - { - throw new ConfigurationException("No authentication managers configured within the configure file."); - } - - return authMgr; + _databaseManager = new ConfigurationFilePrincipalDatabaseManager(_configuration); } protected void initialiseVirtualHosts() throws Exception @@ -336,88 +320,26 @@ public abstract class ApplicationRegistry implements IApplicationRegistry { _managedObjectRegistry = new NoopManagedObjectRegistry(); } - - public void initialiseStatisticsReporting() - { - long report = _configuration.getStatisticsReportingPeriod() * 1000; // convert to ms - final boolean broker = _configuration.isStatisticsGenerationBrokerEnabled(); - final boolean virtualhost = _configuration.isStatisticsGenerationVirtualhostsEnabled(); - final boolean reset = _configuration.isStatisticsReportResetEnabled(); - - /* add a timer task to report statistics if generation is enabled for broker or virtualhosts */ - if (report > 0L && (broker || virtualhost)) - { - _reportingTimer = new Timer("Statistics-Reporting", true); - - class StatisticsReportingTask extends TimerTask - { - private final int DELIVERED = 0; - private final int RECEIVED = 1; - - public void run() - { - CurrentActor.set(new AbstractActor(ApplicationRegistry.getInstance().getRootMessageLogger()) { - public String getLogMessage() - { - return "[" + Thread.currentThread().getName() + "] "; - } - }); - - if (broker) - { - CurrentActor.get().message(BrokerMessages.STATS_DATA(DELIVERED, _dataDelivered.getPeak() / 1024.0, _dataDelivered.getTotal())); - CurrentActor.get().message(BrokerMessages.STATS_MSGS(DELIVERED, _messagesDelivered.getPeak(), _messagesDelivered.getTotal())); - CurrentActor.get().message(BrokerMessages.STATS_DATA(RECEIVED, _dataReceived.getPeak() / 1024.0, _dataReceived.getTotal())); - CurrentActor.get().message(BrokerMessages.STATS_MSGS(RECEIVED, _messagesReceived.getPeak(), _messagesReceived.getTotal())); - } - - if (virtualhost) - { - for (VirtualHost vhost : getVirtualHostRegistry().getVirtualHosts()) - { - String name = vhost.getName(); - StatisticsCounter dataDelivered = vhost.getDataDeliveryStatistics(); - StatisticsCounter messagesDelivered = vhost.getMessageDeliveryStatistics(); - StatisticsCounter dataReceived = vhost.getDataReceiptStatistics(); - StatisticsCounter messagesReceived = vhost.getMessageReceiptStatistics(); - - CurrentActor.get().message(VirtualHostMessages.STATS_DATA(name, DELIVERED, dataDelivered.getPeak() / 1024.0, dataDelivered.getTotal())); - CurrentActor.get().message(VirtualHostMessages.STATS_MSGS(name, DELIVERED, messagesDelivered.getPeak(), messagesDelivered.getTotal())); - CurrentActor.get().message(VirtualHostMessages.STATS_DATA(name, RECEIVED, dataReceived.getPeak() / 1024.0, dataReceived.getTotal())); - CurrentActor.get().message(VirtualHostMessages.STATS_MSGS(name, RECEIVED, messagesReceived.getPeak(), messagesReceived.getTotal())); - } - } - - if (reset) - { - resetStatistics(); - } - - CurrentActor.remove(); - } - } - _reportingTimer.scheduleAtFixedRate(new StatisticsReportingTask(), - report / 2, - report); - } + public static IApplicationRegistry getInstance() + { + return getInstance(DEFAULT_INSTANCE); } - /** - * Get the ApplicationRegistry - * @return the IApplicationRegistry instance - * @throws IllegalStateException if no registry instance has been initialised. - */ - public static IApplicationRegistry getInstance() throws IllegalStateException + public static IApplicationRegistry getInstance(int instanceID) { - IApplicationRegistry iApplicationRegistry = _instance.get(); - if (iApplicationRegistry == null) - { - throw new IllegalStateException("No ApplicationRegistry has been initialised"); - } - else + synchronized (IApplicationRegistry.class) { - return iApplicationRegistry; + IApplicationRegistry instance = _instanceMap.get(instanceID); + + if (instance == null) + { + throw new IllegalStateException("Application Registry (" + instanceID + ") not created"); + } + else + { + return instance; + } } } @@ -447,12 +369,6 @@ public abstract class ApplicationRegistry implements IApplicationRegistry { _logger.info("Shutting down ApplicationRegistry:" + this); } - - //Stop Statistics Reporting - if (_reportingTimer != null) - { - _reportingTimer.cancel(); - } //Stop incoming connections unbind(); @@ -460,6 +376,10 @@ public abstract class ApplicationRegistry implements IApplicationRegistry //Shutdown virtualhosts close(_virtualHostRegistry); +// close(_accessManager); +// +// close(_databaseManager); + close(_authenticationManager); close(_managedObjectRegistry); @@ -481,7 +401,7 @@ public abstract class ApplicationRegistry implements IApplicationRegistry try { - acceptor.getNetworkTransport().close(); + acceptor.getNetworkDriver().close(); } catch (Throwable e) { @@ -521,6 +441,11 @@ public abstract class ApplicationRegistry implements IApplicationRegistry return _managedObjectRegistry; } + public PrincipalDatabaseManager getDatabaseManager() + { + return _databaseManager; + } + public AuthenticationManager getAuthenticationManager() { return _authenticationManager; @@ -573,76 +498,4 @@ public abstract class ApplicationRegistry implements IApplicationRegistry getBroker().addVirtualHost(virtualHost); return virtualHost; } - - public void registerMessageDelivered(long messageSize) - { - if (isStatisticsEnabled()) - { - _messagesDelivered.registerEvent(1L); - _dataDelivered.registerEvent(messageSize); - } - } - - public void registerMessageReceived(long messageSize, long timestamp) - { - if (isStatisticsEnabled()) - { - _messagesReceived.registerEvent(1L, timestamp); - _dataReceived.registerEvent(messageSize, timestamp); - } - } - - public StatisticsCounter getMessageReceiptStatistics() - { - return _messagesReceived; - } - - public StatisticsCounter getDataReceiptStatistics() - { - return _dataReceived; - } - - public StatisticsCounter getMessageDeliveryStatistics() - { - return _messagesDelivered; - } - - public StatisticsCounter getDataDeliveryStatistics() - { - return _dataDelivered; - } - - public void resetStatistics() - { - _messagesDelivered.reset(); - _dataDelivered.reset(); - _messagesReceived.reset(); - _dataReceived.reset(); - - for (VirtualHost vhost : _virtualHostRegistry.getVirtualHosts()) - { - vhost.resetStatistics(); - } - } - - public void initialiseStatistics() - { - setStatisticsEnabled(!StatisticsCounter.DISABLE_STATISTICS && - getConfiguration().isStatisticsGenerationBrokerEnabled()); - - _messagesDelivered = new StatisticsCounter("messages-delivered"); - _dataDelivered = new StatisticsCounter("bytes-delivered"); - _messagesReceived = new StatisticsCounter("messages-received"); - _dataReceived = new StatisticsCounter("bytes-received"); - } - - public boolean isStatisticsEnabled() - { - return _statisticsEnabled; - } - - public void setStatisticsEnabled(boolean enabled) - { - _statisticsEnabled = enabled; - } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/BrokerConfigAdapter.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/BrokerConfigAdapter.java index 108533ef96..4a4253153c 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/BrokerConfigAdapter.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/BrokerConfigAdapter.java @@ -71,7 +71,7 @@ public class BrokerConfigAdapter implements BrokerConfig public Integer getWorkerThreads() { - return _instance.getConfiguration().getConnectorProcessors(); + return _instance.getConfiguration().getProcessors(); } public Integer getMaxConnections() diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java index c27e0d19ec..228c3b9112 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java @@ -33,20 +33,21 @@ import org.apache.qpid.server.logging.RootMessageLogger; import org.apache.qpid.server.management.ManagedObjectRegistry; import org.apache.qpid.server.plugins.PluginManager; import org.apache.qpid.server.security.SecurityManager; +import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager; import org.apache.qpid.server.security.auth.manager.AuthenticationManager; -import org.apache.qpid.server.stats.StatisticsGatherer; import org.apache.qpid.server.transport.QpidAcceptor; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; -public interface IApplicationRegistry extends StatisticsGatherer +public interface IApplicationRegistry { /** * Initialise the application registry. All initialisation must be done in this method so that any components * that need access to the application registry itself for initialisation are able to use it. Attempting to * initialise in the constructor will lead to failures since the registry reference will not have been set. + * @param instanceID the instanceID that we can use to identify this AR. */ - void initialise() throws Exception; + void initialise(int instanceID) throws Exception; /** * Shutdown this Registry @@ -62,6 +63,8 @@ public interface IApplicationRegistry extends StatisticsGatherer ManagedObjectRegistry getManagedObjectRegistry(); + PrincipalDatabaseManager getDatabaseManager(); + AuthenticationManager getAuthenticationManager(); VirtualHostRegistry getVirtualHostRegistry(); @@ -94,6 +97,4 @@ public interface IApplicationRegistry extends StatisticsGatherer ConfigStore getConfigStore(); void setConfigStore(ConfigStore store); - - void initialiseStatisticsReporting(); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/AuthorizationHolder.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/AuthorizationHolder.java deleted file mode 100755 index 3d8c77a86f..0000000000 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/AuthorizationHolder.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.security; - -import java.security.Principal; - -import javax.security.auth.Subject; - -import org.apache.qpid.server.security.auth.sasl.GroupPrincipal; -import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal; - -/** - * Represents the authorization of the logged on user. - * - */ -public interface AuthorizationHolder -{ - /** - * Returns the {@link Subject} of the authorized user. This is guaranteed to - * contain at least one {@link UsernamePrincipal}, representing the the identity - * used when the user logged on to the application, and zero or more {@link GroupPrincipal} - * representing the group(s) to which the user belongs. - * - * @return the Subject - */ - Subject getAuthorizedSubject(); - - /** - * Returns the {@link Principal} representing the the identity - * used when the user logged on to the application. - * - * @return a Principal - */ - Principal getAuthorizedPrincipal(); -} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AmqpProtocolVersion.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/PrincipalHolder.java index e925d7a1ec..7e93623cab 100644..100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AmqpProtocolVersion.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/PrincipalHolder.java @@ -18,6 +18,12 @@ * under the License. * */ -package org.apache.qpid.server.protocol; +package org.apache.qpid.server.security; -public enum AmqpProtocolVersion { v0_8, v0_9, v0_9_1, v0_10 }
\ No newline at end of file +import java.security.Principal; + +public interface PrincipalHolder +{ + /** @return a Principal that was used to authorized this session */ + Principal getPrincipal(); +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/SecurityManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/SecurityManager.java index f582fed6a0..f18c327692 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/SecurityManager.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/SecurityManager.java @@ -18,19 +18,8 @@ */ package org.apache.qpid.server.security; -import static org.apache.qpid.server.security.access.ObjectType.EXCHANGE; -import static org.apache.qpid.server.security.access.ObjectType.METHOD; -import static org.apache.qpid.server.security.access.ObjectType.OBJECT; -import static org.apache.qpid.server.security.access.ObjectType.QUEUE; -import static org.apache.qpid.server.security.access.ObjectType.VIRTUALHOST; -import static org.apache.qpid.server.security.access.Operation.ACCESS; -import static org.apache.qpid.server.security.access.Operation.BIND; -import static org.apache.qpid.server.security.access.Operation.CONSUME; -import static org.apache.qpid.server.security.access.Operation.CREATE; -import static org.apache.qpid.server.security.access.Operation.DELETE; -import static org.apache.qpid.server.security.access.Operation.PUBLISH; -import static org.apache.qpid.server.security.access.Operation.PURGE; -import static org.apache.qpid.server.security.access.Operation.UNBIND; +import static org.apache.qpid.server.security.access.ObjectType.*; +import static org.apache.qpid.server.security.access.Operation.*; import java.net.SocketAddress; import java.security.Principal; @@ -40,8 +29,6 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; -import javax.security.auth.Subject; - import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.ConfigurationException; import org.apache.log4j.Logger; @@ -50,9 +37,11 @@ import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin; import org.apache.qpid.server.configuration.plugins.ConfigurationPluginFactory; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.plugins.PluginManager; +import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.security.access.ObjectProperties; import org.apache.qpid.server.security.access.Operation; +import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal; /** * The security manager contains references to all loaded {@link SecurityPlugin}s and delegates security decisions to them based @@ -66,7 +55,7 @@ public class SecurityManager private static final Logger _logger = Logger.getLogger(SecurityManager.class); /** Container for the {@link Principal} that is using to this thread. */ - private static final ThreadLocal<Subject> _subject = new ThreadLocal<Subject>(); + private static final ThreadLocal<Principal> _principal = new ThreadLocal<Principal>(); private PluginManager _pluginManager; private Map<String, SecurityPluginFactory> _pluginFactories = new HashMap<String, SecurityPluginFactory>(); @@ -137,14 +126,19 @@ public class SecurityManager configureHostPlugins(configuration); } - public static Subject getThreadSubject() + public static Principal getThreadPrincipal() + { + return _principal.get(); + } + + public static void setThreadPrincipal(Principal principal) { - return _subject.get(); + _principal.set(principal); } - public static void setThreadSubject(final Subject subject) + public static void setThreadPrincipal(String authId) { - _subject.set(subject); + setThreadPrincipal(new UsernamePrincipal(authId)); } public void configureHostPlugins(ConfigurationPlugin hostConfig) throws ConfigurationException diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/ObjectProperties.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/ObjectProperties.java index e4bf8df340..70a9ea5356 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/ObjectProperties.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/ObjectProperties.java @@ -149,9 +149,9 @@ public class ObjectProperties extends HashMap<ObjectProperties.Property, String> { put(Property.OWNER, queue.getOwner()); } - else if (queue.getAuthorizationHolder() != null) + else if (queue.getPrincipalHolder() != null) { - put(Property.OWNER, queue.getAuthorizationHolder().getAuthorizedPrincipal().getName()); + put(Property.OWNER, queue.getPrincipalHolder().getPrincipal().getName()); } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/AuthenticationResult.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/AuthenticationResult.java index 8c2d60a660..62967ef7eb 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/AuthenticationResult.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/AuthenticationResult.java @@ -20,93 +20,42 @@ */ package org.apache.qpid.server.security.auth; -import javax.security.auth.Subject; - -/** - * Encapsulates the result of an attempt to authenticate. - * <p> - * The authentication status describes the overall outcome. - * <p> - * <ol> - * <li>If authentication status is SUCCESS, the subject will be populated. - * </li> - * <li>If authentication status is CONTINUE, the authentication has failed because the user - * supplied incorrect credentials (etc). If the authentication requires it, the next challenge - * is made available. - * </li> - * <li>If authentication status is ERROR , the authentication decision could not be made due - * to a failure (such as an external system), the {@link AuthenticationResult#getCause()} - * will provide the underlying exception. - * </li> - * </ol> - * - */ public class AuthenticationResult { public enum AuthenticationStatus { - /** Authentication successful */ - SUCCESS, - /** Authentication not successful due to credentials problem etc */ - CONTINUE, - /** Problem prevented the authentication from being made e.g. failure of an external system */ - ERROR + SUCCESS, CONTINUE, ERROR } - public final AuthenticationStatus _status; - public final byte[] _challenge; - private final Exception _cause; - private final Subject _subject; + public AuthenticationStatus status; + public byte[] challenge; + + private Exception cause; - public AuthenticationResult(final AuthenticationStatus status) + public AuthenticationResult(AuthenticationStatus status) { this(null, status, null); } - public AuthenticationResult(final byte[] challenge, final AuthenticationStatus status) + public AuthenticationResult(byte[] challenge, AuthenticationStatus status) { this(challenge, status, null); } - public AuthenticationResult(final AuthenticationStatus error, final Exception cause) + public AuthenticationResult(AuthenticationStatus error, Exception cause) { this(null, error, cause); } - public AuthenticationResult(final byte[] challenge, final AuthenticationStatus status, final Exception cause) - { - this._status = status; - this._challenge = challenge; - this._cause = cause; - this._subject = null; - } - - public AuthenticationResult(final Subject subject) + public AuthenticationResult(byte[] challenge, AuthenticationStatus status, Exception cause) { - this._status = AuthenticationStatus.SUCCESS; - this._challenge = null; - this._cause = null; - this._subject = subject; + this.status = status; + this.challenge = challenge; + this.cause = cause; } public Exception getCause() { - return _cause; - } - - public AuthenticationStatus getStatus() - { - return _status; - } - - public byte[] getChallenge() - { - return _challenge; + return cause; } - - public Subject getSubject() - { - return _subject; - } - } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/ConfigurationFilePrincipalDatabaseManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/ConfigurationFilePrincipalDatabaseManager.java new file mode 100644 index 0000000000..5cebb7d2d8 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/ConfigurationFilePrincipalDatabaseManager.java @@ -0,0 +1,221 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.security.auth.database; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.lang.reflect.Method; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.commons.configuration.Configuration; +import org.apache.commons.configuration.ConfigurationException; + +import org.apache.log4j.Logger; + +import org.apache.qpid.configuration.PropertyUtils; +import org.apache.qpid.configuration.PropertyException; +import org.apache.qpid.server.configuration.ServerConfiguration; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.security.auth.database.PrincipalDatabase; +import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager; +import org.apache.qpid.server.security.auth.management.AMQUserManagementMBean; +import org.apache.qpid.AMQException; + +import javax.management.JMException; + +public class ConfigurationFilePrincipalDatabaseManager implements PrincipalDatabaseManager +{ + private static final Logger _logger = Logger.getLogger(ConfigurationFilePrincipalDatabaseManager.class); + + Map<String, PrincipalDatabase> _databases; + + public ConfigurationFilePrincipalDatabaseManager(ServerConfiguration _configuration) throws Exception + { + _logger.info("Initialising PrincipalDatabase authentication manager"); + _databases = initialisePrincipalDatabases(_configuration); + } + + private Map<String, PrincipalDatabase> initialisePrincipalDatabases(ServerConfiguration _configuration) throws Exception + { + List<String> databaseNames = _configuration.getPrincipalDatabaseNames(); + List<String> databaseClasses = _configuration.getPrincipalDatabaseClass(); + Map<String, PrincipalDatabase> databases = new HashMap<String, PrincipalDatabase>(); + + if (databaseNames.size() == 0) + { + _logger.warn("No Principal databases specified. Broker running with NO AUTHENTICATION"); + } + + for (int i = 0; i < databaseNames.size(); i++) + { + Object o; + try + { + o = Class.forName(databaseClasses.get(i)).newInstance(); + } + catch (Exception e) + { + throw new Exception("Error initialising principal database: " + e, e); + } + + if (!(o instanceof PrincipalDatabase)) + { + throw new Exception("Principal databases must implement the PrincipalDatabase interface"); + } + + initialisePrincipalDatabase((PrincipalDatabase) o, _configuration, i); + + String name = databaseNames.get(i); + if ((name == null) || (name.length() == 0)) + { + throw new Exception("Principal database names must have length greater than or equal to one character"); + } + + PrincipalDatabase pd = databases.get(name); + if (pd != null) + { + throw new Exception("Duplicate principal database name not permitted"); + } + + _logger.info("Initialised principal database '" + name + "' successfully"); + databases.put(name, (PrincipalDatabase) o); + } + + return databases; + } + + private void initialisePrincipalDatabase(PrincipalDatabase principalDatabase, ServerConfiguration _configuration, int index) + throws FileNotFoundException, ConfigurationException + { + List<String> argumentNames = _configuration.getPrincipalDatabaseAttributeNames(index); + List<String> argumentValues = _configuration.getPrincipalDatabaseAttributeValues(index); + for (int i = 0; i < argumentNames.size(); i++) + { + String argName = argumentNames.get(i); + if ((argName == null) || (argName.length() == 0)) + { + throw new ConfigurationException("Argument names must have length >= 1 character"); + } + + if (Character.isLowerCase(argName.charAt(0))) + { + argName = Character.toUpperCase(argName.charAt(0)) + argName.substring(1); + } + + String methodName = "set" + argName; + Method method = null; + try + { + method = principalDatabase.getClass().getMethod(methodName, String.class); + } + catch (Exception e) + { + // do nothing.. as on error method will be null + } + + if (method == null) + { + throw new ConfigurationException("No method " + methodName + " found in class " + + principalDatabase.getClass() + + " hence unable to configure principal database. The method must be public and " + + "have a single String argument with a void return type"); + } + + try + { + method.invoke(principalDatabase, PropertyUtils.replaceProperties(argumentValues.get(i))); + } + catch (Exception ite) + { + if (ite instanceof ConfigurationException) + { + throw(ConfigurationException) ite; + } + else + { + throw new ConfigurationException(ite.getMessage(), ite); + } + } + } + } + + public Map<String, PrincipalDatabase> getDatabases() + { + return _databases; + } + + public void initialiseManagement(ServerConfiguration config) throws ConfigurationException + { + try + { + AMQUserManagementMBean _mbean = new AMQUserManagementMBean(); + + List<String> principalDBs = config.getManagementPrincipalDBs(); + if (principalDBs.isEmpty()) + { + throw new ConfigurationException("No principal-database specified for jmx security"); + } + + String databaseName = principalDBs.get(0); + PrincipalDatabase database = getDatabases().get(databaseName); + if (database == null) + { + throw new ConfigurationException("Principal-database '" + databaseName + "' not found"); + } + + _mbean.setPrincipalDatabase(database); + + List<String> jmxaccesslist = config.getManagementAccessList(); + if (jmxaccesslist.isEmpty()) + { + throw new ConfigurationException("No access control files specified for jmx security"); + } + + String jmxaccesssFile = null; + + try + { + jmxaccesssFile = PropertyUtils.replaceProperties(jmxaccesslist.get(0)); + } + catch (PropertyException e) + { + throw new ConfigurationException("Unable to parse access control filename '" + jmxaccesssFile + "'"); + } + + try + { + _mbean.setAccessFile(jmxaccesssFile); + } + catch (IOException e) + { + _logger.warn("Unable to load access file:" + jmxaccesssFile); + } + + _mbean.register(); + } + catch (JMException e) + { + _logger.warn("User management disabled as unable to create MBean:" + e); + } + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PrincipalDatabaseManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PrincipalDatabaseManager.java new file mode 100644 index 0000000000..f9882f8810 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PrincipalDatabaseManager.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.server.security.auth.database; + +import org.apache.qpid.server.configuration.ServerConfiguration; +import org.apache.qpid.server.security.auth.database.PrincipalDatabase; +import org.apache.commons.configuration.Configuration; +import org.apache.commons.configuration.ConfigurationException; + +import java.util.Map; + +public interface PrincipalDatabaseManager +{ + public Map<String, PrincipalDatabase> getDatabases(); + + public void initialiseManagement(ServerConfiguration _configuration) throws ConfigurationException; +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PropertiesPrincipalDatabaseManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PropertiesPrincipalDatabaseManager.java new file mode 100644 index 0000000000..8658101cd8 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PropertiesPrincipalDatabaseManager.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.server.security.auth.database; + +import org.apache.commons.configuration.ConfigurationException; +import org.apache.qpid.server.configuration.ServerConfiguration; + +import java.util.Map; +import java.util.Properties; +import java.util.HashMap; + +public class PropertiesPrincipalDatabaseManager implements PrincipalDatabaseManager +{ + + Map<String, PrincipalDatabase> _databases = new HashMap<String, PrincipalDatabase>(); + + public PropertiesPrincipalDatabaseManager(String name, Properties users) + { + _databases.put(name, new PropertiesPrincipalDatabase(users)); + } + + public Map<String, PrincipalDatabase> getDatabases() + { + return _databases; + } + + public void initialiseManagement(ServerConfiguration _configuration) throws ConfigurationException + { + //todo + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/management/AMQUserManagementMBean.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/management/AMQUserManagementMBean.java index 208130379e..ee4336055b 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/management/AMQUserManagementMBean.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/management/AMQUserManagementMBean.java @@ -20,9 +20,19 @@ */ package org.apache.qpid.server.security.auth.management; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; import java.io.IOException; +import java.security.AccessControlContext; +import java.security.AccessController; import java.security.Principal; +import java.util.Enumeration; import java.util.List; +import java.util.Properties; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.locks.ReentrantLock; import javax.management.JMException; import javax.management.openmbean.CompositeData; @@ -34,13 +44,17 @@ import javax.management.openmbean.SimpleType; import javax.management.openmbean.TabularData; import javax.management.openmbean.TabularDataSupport; import javax.management.openmbean.TabularType; +import javax.management.remote.JMXPrincipal; +import javax.security.auth.Subject; import javax.security.auth.login.AccountNotFoundException; +import org.apache.commons.configuration.ConfigurationException; import org.apache.log4j.Logger; import org.apache.qpid.management.common.mbeans.UserManagement; import org.apache.qpid.management.common.mbeans.annotations.MBeanDescription; import org.apache.qpid.management.common.mbeans.annotations.MBeanOperation; import org.apache.qpid.server.management.AMQManagedObject; +import org.apache.qpid.server.management.MBeanInvocationHandlerImpl; import org.apache.qpid.server.security.auth.database.PrincipalDatabase; import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal; @@ -51,18 +65,22 @@ public class AMQUserManagementMBean extends AMQManagedObject implements UserMana private static final Logger _logger = Logger.getLogger(AMQUserManagementMBean.class); private PrincipalDatabase _principalDatabase; + private Properties _accessRights; + private File _accessFile; + + private ReentrantLock _accessRightsUpdate = new ReentrantLock(); // Setup for the TabularType - private static final TabularType _userlistDataType; // Datatype for representing User Lists - private static final CompositeType _userDataType; // Composite type for representing User + static TabularType _userlistDataType; // Datatype for representing User Lists + static CompositeType _userDataType; // Composite type for representing User static { OpenType[] userItemTypes = new OpenType[4]; // User item types. userItemTypes[0] = SimpleType.STRING; // For Username - userItemTypes[1] = SimpleType.BOOLEAN; // For Rights - Read - No longer in use - userItemTypes[2] = SimpleType.BOOLEAN; // For Rights - Write - No longer in use - userItemTypes[3] = SimpleType.BOOLEAN; // For Rights - Admin - No longer is use + userItemTypes[1] = SimpleType.BOOLEAN; // For Rights - Read + userItemTypes[2] = SimpleType.BOOLEAN; // For Rights - Write + userItemTypes[3] = SimpleType.BOOLEAN; // For Rights - Admin try { @@ -74,11 +92,12 @@ public class AMQUserManagementMBean extends AMQManagedObject implements UserMana } catch (OpenDataException e) { - _logger.error("Tabular data setup for viewing users incorrect.", e); - throw new ExceptionInInitializerError("Tabular data setup for viewing users incorrect"); + _logger.error("Tabular data setup for viewing users incorrect."); + _userlistDataType = null; } } + public AMQUserManagementMBean() throws JMException { super(UserManagement.class, UserManagement.TYPE); @@ -91,23 +110,121 @@ public class AMQUserManagementMBean extends AMQManagedObject implements UserMana public boolean setPassword(String username, String password) { + return setPassword(username, password.toCharArray()); + } + + public boolean setPassword(String username, char[] password) + { try { //delegate password changes to the Principal Database - return _principalDatabase.updatePassword(new UsernamePrincipal(username), password.toCharArray()); + return _principalDatabase.updatePassword(new UsernamePrincipal(username), password); } catch (AccountNotFoundException e) { - _logger.warn("Attempt to set password of non-existent user'" + username + "'"); + _logger.warn("Attempt to set password of non-existant user'" + username + "'"); return false; } } - public boolean createUser(String username, String password) + public boolean setRights(String username, boolean read, boolean write, boolean admin) { - if (_principalDatabase.createPrincipal(new UsernamePrincipal(username), password.toCharArray())) + + Object oldRights = null; + if ((oldRights =_accessRights.get(username)) == null) { - return true; + // If the user doesn't exist in the access rights file check that they at least have an account. + if (_principalDatabase.getUser(username) == null) + { + return false; + } + } + + try + { + _accessRightsUpdate.lock(); + + // Update the access rights + if (admin) + { + _accessRights.put(username, MBeanInvocationHandlerImpl.ADMIN); + } + else + { + if (read | write) + { + if (read) + { + _accessRights.put(username, MBeanInvocationHandlerImpl.READONLY); + } + if (write) + { + _accessRights.put(username, MBeanInvocationHandlerImpl.READWRITE); + } + } + else + { + _accessRights.remove(username); + } + } + + //save the rights file + try + { + saveAccessFile(); + } + catch (IOException e) + { + _logger.warn("Problem occured saving '" + _accessFile + "', the access right changes will not be preserved: " + e); + + //the rights file was not successfully saved, restore user rights to previous value + _logger.warn("Reverting attempted rights update for user'" + username + "'"); + if (oldRights != null) + { + _accessRights.put(username, oldRights); + } + else + { + _accessRights.remove(username); + } + + return false; + } + } + finally + { + _accessRightsUpdate.unlock(); + } + + return true; + } + + public boolean createUser(String username, String password, boolean read, boolean write, boolean admin) + { + return createUser(username, password.toCharArray(), read, write, admin); + } + + public boolean createUser(String username, char[] password, boolean read, boolean write, boolean admin) + { + if (_principalDatabase.createPrincipal(new UsernamePrincipal(username), password)) + { + if (!setRights(username, read, write, admin)) + { + //unable to set rights for user, remove account + try + { + _principalDatabase.deletePrincipal(new UsernamePrincipal(username)); + } + catch (AccountNotFoundException e) + { + //ignore + } + return false; + } + else + { + return true; + } } return false; @@ -117,7 +234,29 @@ public class AMQUserManagementMBean extends AMQManagedObject implements UserMana { try { - _principalDatabase.deletePrincipal(new UsernamePrincipal(username)); + if (_principalDatabase.deletePrincipal(new UsernamePrincipal(username))) + { + try + { + _accessRightsUpdate.lock(); + + _accessRights.remove(username); + + try + { + saveAccessFile(); + } + catch (IOException e) + { + _logger.warn("Problem occured saving '" + _accessFile + "', the access right changes will not be preserved: " + e); + return false; + } + } + finally + { + _accessRightsUpdate.unlock(); + } + } } catch (AccountNotFoundException e) { @@ -130,23 +269,38 @@ public class AMQUserManagementMBean extends AMQManagedObject implements UserMana public boolean reloadData() { - try - { - _principalDatabase.reload(); - } - catch (IOException e) - { - _logger.warn("Reload failed due to:", e); - return false; - } - // Reload successful - return true; + try + { + loadAccessFile(); + _principalDatabase.reload(); + } + catch (ConfigurationException e) + { + _logger.warn("Reload failed due to:" + e); + return false; + } + catch (IOException e) + { + _logger.warn("Reload failed due to:" + e); + return false; + } + // Reload successful + return true; } - @MBeanOperation(name = "viewUsers", description = "All users that are currently available to the system.") + @MBeanOperation(name = "viewUsers", description = "All users with access rights to the system.") public TabularData viewUsers() { + // Table of users + // Username(string), Access rights Read,Write,Admin(bool,bool,bool) + + if (_userlistDataType == null) + { + _logger.warn("TabluarData not setup correctly"); + return null; + } + List<Principal> users = _principalDatabase.getUsers(); TabularDataSupport userList = new TabularDataSupport(_userlistDataType); @@ -157,15 +311,29 @@ public class AMQUserManagementMBean extends AMQManagedObject implements UserMana for (Principal user : users) { // Create header attributes list - // Read,Write,Admin items are depcreated and we return always false. - Object[] itemData = {user.getName(), false, false, false}; + + String rights = (String) _accessRights.get(user.getName()); + + Boolean read = false; + Boolean write = false; + Boolean admin = false; + + if (rights != null) + { + read = rights.equals(MBeanInvocationHandlerImpl.READONLY) + || rights.equals(MBeanInvocationHandlerImpl.READWRITE); + write = rights.equals(MBeanInvocationHandlerImpl.READWRITE); + admin = rights.equals(MBeanInvocationHandlerImpl.ADMIN); + } + + Object[] itemData = {user.getName(), read, write, admin}; CompositeData messageData = new CompositeDataSupport(_userDataType, COMPOSITE_ITEM_NAMES.toArray(new String[COMPOSITE_ITEM_NAMES.size()]), itemData); userList.put(messageData); } } catch (OpenDataException e) { - _logger.warn("Unable to create user list due to :", e); + _logger.warn("Unable to create user list due to :" + e); return null; } @@ -183,4 +351,187 @@ public class AMQUserManagementMBean extends AMQManagedObject implements UserMana { _principalDatabase = database; } + + /** + * setAccessFile + * + * @param accessFile the file to use for updating. + * + * @throws java.io.IOException If the file cannot be accessed + * @throws org.apache.commons.configuration.ConfigurationException + * if checks on the file fail. + */ + public void setAccessFile(String accessFile) throws IOException, ConfigurationException + { + if (accessFile != null) + { + _accessFile = new File(accessFile); + if (!_accessFile.exists()) + { + throw new ConfigurationException("'" + _accessFile + "' does not exist"); + } + + if (!_accessFile.canRead()) + { + throw new ConfigurationException("Cannot read '" + _accessFile + "'."); + } + + if (!_accessFile.canWrite()) + { + _logger.warn("Unable to write to access rights file '" + _accessFile + "', changes will not be preserved."); + } + + loadAccessFile(); + } + else + { + _logger.warn("Access rights file specified is null. Access rights not changed."); + } + } + + private void loadAccessFile() throws IOException, ConfigurationException + { + if(_accessFile == null) + { + _logger.error("No jmx access rights file has been specified."); + return; + } + + if(_accessFile.exists()) + { + try + { + _accessRightsUpdate.lock(); + + Properties accessRights = new Properties(); + FileInputStream inStream = new FileInputStream(_accessFile); + try + { + accessRights.load(inStream); + } + finally + { + inStream.close(); + } + + checkAccessRights(accessRights); + setAccessRights(accessRights); + } + finally + { + _accessRightsUpdate.unlock(); + } + } + else + { + _logger.error("Specified jmxaccess rights file '" + _accessFile + "' does not exist."); + } + } + + private void checkAccessRights(Properties accessRights) + { + Enumeration values = accessRights.propertyNames(); + + while (values.hasMoreElements()) + { + String user = (String) values.nextElement(); + + if (_principalDatabase.getUser(user) == null) + { + _logger.warn("Access rights contains user '" + user + "' but there is no authentication data for that user"); + } + } + } + + private void saveAccessFile() throws IOException + { + try + { + _accessRightsUpdate.lock(); + + // Create temporary file + Random r = new Random(); + File tmp; + do + { + tmp = new File(_accessFile.getPath() + r.nextInt() + ".tmp"); + } + while(tmp.exists()); + + tmp.deleteOnExit(); + + FileOutputStream output = new FileOutputStream(tmp); + _accessRights.store(output, "Generated by AMQUserManagementMBean Console : Last edited by user:" + getCurrentJMXUser()); + output.close(); + + // Swap temp file to main rights file. + File old = new File(_accessFile.getAbsoluteFile() + ".old"); + if (old.exists()) + { + old.delete(); + } + + if(!_accessFile.renameTo(old)) + { + //unable to rename the existing file to the backup name + _logger.error("Could not backup the existing management rights file"); + throw new IOException("Could not backup the existing management rights file"); + } + + if(!tmp.renameTo(_accessFile)) + { + //failed to rename the new file to the required filename + + if(!old.renameTo(_accessFile)) + { + //unable to return the backup to required filename + _logger.error("Could not rename the new management rights file into place, and unable to restore original file"); + throw new IOException("Could not rename the new management rights file into place, and unable to restore original file"); + } + + _logger.error("Could not rename the new management rights file into place"); + throw new IOException("Could not rename the new management rights file into place"); + } + } + finally + { + _accessRightsUpdate.unlock(); + } + + } + + private String getCurrentJMXUser() + { + AccessControlContext acc = AccessController.getContext(); + + Subject subject = Subject.getSubject(acc); + if (subject == null) + { + return "Unknown user, authentication Subject was null"; + } + + // Retrieve JMXPrincipal from Subject + Set<JMXPrincipal> principals = subject.getPrincipals(JMXPrincipal.class); + if (principals == null || principals.isEmpty()) + { + return "Unknown user principals were null"; + } + + Principal principal = principals.iterator().next(); + return principal.getName(); + } + + /** + * user=read user=write user=readwrite user=admin + * + * @param accessRights The properties list of access rights to process + */ + private void setAccessRights(Properties accessRights) + { + _logger.debug("Setting Access Rights:" + accessRights); + _accessRights = accessRights; + + // TODO check where this is used + // MBeanInvocationHandlerImpl.setAccessRights(_accessRights); + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/AuthenticationManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/AuthenticationManager.java index 4c59c25d84..bc771162fd 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/AuthenticationManager.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/AuthenticationManager.java @@ -20,73 +20,17 @@ */ package org.apache.qpid.server.security.auth.manager; -import javax.security.auth.Subject; import javax.security.sasl.SaslException; import javax.security.sasl.SaslServer; import org.apache.qpid.common.Closeable; -import org.apache.qpid.server.plugins.Plugin; import org.apache.qpid.server.security.auth.AuthenticationResult; -/** - * Implementations of the AuthenticationManager are responsible for determining - * the authenticity of a user's credentials. - * - * If the authentication is successful, the manager is responsible for producing a populated - * {@link Subject} containing the user's identity and zero or more principals representing - * groups to which the user belongs. - * <p> - * The {@link #initialise()} method is responsible for registering SASL mechanisms required by - * the manager. The {@link #close()} method must reverse this registration. - * - */ -public interface AuthenticationManager extends Closeable, Plugin +public interface AuthenticationManager extends Closeable { - /** The name for the required SASL Server mechanisms */ - public static final String PROVIDER_NAME= "AMQSASLProvider-Server"; - - /** - * Initialise the authentication plugin. - * - */ - void initialise(); - - /** - * Gets the SASL mechanisms known to this manager. - * - * @return SASL mechanism names, space separated. - */ String getMechanisms(); - /** - * Creates a SASL server for the specified mechanism name for the given - * fully qualified domain name. - * - * @param mechanism mechanism name - * @param localFQDN domain name - * - * @return SASL server - * @throws SaslException - */ SaslServer createSaslServer(String mechanism, String localFQDN) throws SaslException; - /** - * Authenticates a user using SASL negotiation. - * - * @param server SASL server - * @param response SASL response to process - * - * @return authentication result - */ AuthenticationResult authenticate(SaslServer server, byte[] response); - - /** - * Authenticates a user using their username and password. - * - * @param username username - * @param password password - * - * @return authentication result - */ - AuthenticationResult authenticate(String username, String password); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/AuthenticationManagerPluginFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/AuthenticationManagerPluginFactory.java deleted file mode 100644 index a51f195761..0000000000 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/AuthenticationManagerPluginFactory.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.security.auth.manager; - -import org.apache.qpid.server.plugins.PluginFactory; - -/** - * Factory producing authentication producing configured, initialised authentication - * managers. - */ -public interface AuthenticationManagerPluginFactory<S extends AuthenticationManager> extends PluginFactory<S> -{ - -} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java index 1945c2e15f..2a967f02af 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java @@ -20,65 +20,27 @@ */ package org.apache.qpid.server.security.auth.manager; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.security.Security; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.TreeMap; - -import javax.security.auth.Subject; -import javax.security.auth.callback.CallbackHandler; -import javax.security.auth.login.AccountNotFoundException; -import javax.security.sasl.Sasl; -import javax.security.sasl.SaslException; -import javax.security.sasl.SaslServer; -import javax.security.sasl.SaslServerFactory; - +import org.apache.log4j.Logger; import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.ConfigurationException; -import org.apache.log4j.Logger; -import org.apache.qpid.configuration.PropertyException; -import org.apache.qpid.configuration.PropertyUtils; -import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin; -import org.apache.qpid.server.configuration.plugins.ConfigurationPluginFactory; -import org.apache.qpid.server.security.auth.AuthenticationResult; -import org.apache.qpid.server.security.auth.AuthenticationResult.AuthenticationStatus; +import org.apache.qpid.server.configuration.VirtualHostConfiguration; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.security.auth.manager.AuthenticationManager; import org.apache.qpid.server.security.auth.database.PrincipalDatabase; -import org.apache.qpid.server.security.auth.management.AMQUserManagementMBean; -import org.apache.qpid.server.security.auth.sasl.AuthenticationProviderInitialiser; import org.apache.qpid.server.security.auth.sasl.JCAProvider; -import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal; +import org.apache.qpid.server.security.auth.sasl.AuthenticationProviderInitialiser; +import org.apache.qpid.server.security.auth.AuthenticationResult; +import javax.security.auth.callback.CallbackHandler; +import javax.security.sasl.SaslServerFactory; +import javax.security.sasl.SaslServer; +import javax.security.sasl.SaslException; +import javax.security.sasl.Sasl; +import java.util.Map; +import java.util.HashMap; +import java.util.TreeMap; +import java.security.Security; -/** - * Concrete implementation of the AuthenticationManager that determines if supplied - * user credentials match those appearing in a PrincipalDatabase. The implementation - * of the PrincipalDatabase is determined from the configuration. - * - * This implementation also registers the JMX UserManagemement MBean. - * - * This plugin expects configuration such as: - * - * <pre> - * <pd-auth-manager> - * <principal-database> - * <class>org.apache.qpid.server.security.auth.database.PlainPasswordFilePrincipalDatabase</class> - * <attributes> - * <attribute> - * <name>passwordFile</name> - * <value>${conf}/passwd</value> - * </attribute> - * </attributes> - * </principal-database> - * </pd-auth-manager> - * </pre> - */ public class PrincipalDatabaseAuthenticationManager implements AuthenticationManager { private static final Logger _logger = Logger.getLogger(PrincipalDatabaseAuthenticationManager.class); @@ -87,109 +49,55 @@ public class PrincipalDatabaseAuthenticationManager implements AuthenticationMan private String _mechanisms; /** Maps from the mechanism to the callback handler to use for handling those requests */ - private final Map<String, CallbackHandler> _callbackHandlerMap = new HashMap<String, CallbackHandler>(); + private Map<String, CallbackHandler> _callbackHandlerMap = new HashMap<String, CallbackHandler>(); /** * Maps from the mechanism to the properties used to initialise the server. See the method Sasl.createSaslServer for * details of the use of these properties. This map is populated during initialisation of each provider. */ - private final Map<String, Map<String, ?>> _serverCreationProperties = new HashMap<String, Map<String, ?>>(); - - protected PrincipalDatabase _principalDatabase = null; + private Map<String, Map<String, ?>> _serverCreationProperties = new HashMap<String, Map<String, ?>>(); - protected AMQUserManagementMBean _mbean = null; + private AuthenticationManager _default = null; + /** The name for the required SASL Server mechanisms */ + public static final String PROVIDER_NAME= "AMQSASLProvider-Server"; - public static final AuthenticationManagerPluginFactory<PrincipalDatabaseAuthenticationManager> FACTORY = new AuthenticationManagerPluginFactory<PrincipalDatabaseAuthenticationManager>() + public PrincipalDatabaseAuthenticationManager(String name, VirtualHostConfiguration hostConfig) throws Exception { - public PrincipalDatabaseAuthenticationManager newInstance(final ConfigurationPlugin config) throws ConfigurationException - { - final PrincipalDatabaseAuthenticationManagerConfiguration configuration = config.getConfiguration(PrincipalDatabaseAuthenticationManagerConfiguration.class.getName()); + _logger.info("Initialising " + (name == null ? "Default" : "'" + name + "'") + + " PrincipalDatabase authentication manager."); - // If there is no configuration for this plugin then don't load it. - if (configuration == null) - { - _logger.info("No authentication-manager configuration found for PrincipalDatabaseAuthenticationManager"); - return null; - } + // Fixme This should be done per Vhost but allowing global hack isn't right but ... + // required as authentication is done before Vhost selection - final PrincipalDatabaseAuthenticationManager pdam = new PrincipalDatabaseAuthenticationManager(); - pdam.configure(configuration); - pdam.initialise(); - return pdam; - } + Map<String, Class<? extends SaslServerFactory>> providerMap = new TreeMap<String, Class<? extends SaslServerFactory>>(); - public Class<PrincipalDatabaseAuthenticationManager> getPluginClass() - { - return PrincipalDatabaseAuthenticationManager.class; - } - public String getPluginName() + if (name == null || hostConfig == null) { - return PrincipalDatabaseAuthenticationManager.class.getName(); + initialiseAuthenticationMechanisms(providerMap, ApplicationRegistry.getInstance().getDatabaseManager().getDatabases()); } - }; - - public static class PrincipalDatabaseAuthenticationManagerConfiguration extends ConfigurationPlugin { - - public static final ConfigurationPluginFactory FACTORY = new ConfigurationPluginFactory() + else { - public List<String> getParentPaths() - { - return Arrays.asList("security.pd-auth-manager"); - } + String databaseName = hostConfig.getAuthenticationDatabase(); - public ConfigurationPlugin newInstance(final String path, final Configuration config) throws ConfigurationException + if (databaseName == null) { - final ConfigurationPlugin instance = new PrincipalDatabaseAuthenticationManagerConfiguration(); - - instance.setConfiguration(path, config); - return instance; - } - }; - public String[] getElementsProcessed() - { - return new String[] {"principal-database.class", - "principal-database.attributes.attribute.name", - "principal-database.attributes.attribute.value"}; - } - - public void validateConfiguration() throws ConfigurationException - { - } - - public String getPrincipalDatabaseClass() - { - return _configuration.getString("principal-database.class"); - } - - public Map<String,String> getPdClassAttributeMap() throws ConfigurationException - { - final List<String> argumentNames = _configuration.getList("principal-database.attributes.attribute.name"); - final List<String> argumentValues = _configuration.getList("principal-database.attributes.attribute.value"); - final Map<String,String> attributes = new HashMap<String,String>(argumentNames.size()); - - for (int i = 0; i < argumentNames.size(); i++) + _default = ApplicationRegistry.getInstance().getAuthenticationManager(); + return; + } + else { - final String argName = argumentNames.get(i); - final String argValue = argumentValues.get(i); + PrincipalDatabase database = ApplicationRegistry.getInstance().getDatabaseManager().getDatabases().get(databaseName); - attributes.put(argName, argValue); - } + if (database == null) + { + throw new ConfigurationException("Requested database:" + databaseName + " was not found"); + } - return Collections.unmodifiableMap(attributes); + initialiseAuthenticationMechanisms(providerMap, database); + } } - } - - protected PrincipalDatabaseAuthenticationManager() - { - } - - public void initialise() - { - final Map<String, Class<? extends SaslServerFactory>> providerMap = new TreeMap<String, Class<? extends SaslServerFactory>>(); - - initialiseAuthenticationMechanisms(providerMap, _principalDatabase); if (providerMap.size() > 0) { @@ -202,16 +110,33 @@ public class PrincipalDatabaseAuthenticationManager implements AuthenticationMan { _logger.info("Additional SASL providers successfully registered."); } + } else { _logger.warn("No additional SASL providers registered."); } - registerManagement(); } - private void initialiseAuthenticationMechanisms(Map<String, Class<? extends SaslServerFactory>> providerMap, PrincipalDatabase database) + + private void initialiseAuthenticationMechanisms(Map<String, Class<? extends SaslServerFactory>> providerMap, Map<String, PrincipalDatabase> databases) throws Exception + { + if (databases.size() > 1) + { + _logger.warn("More than one principle database provided currently authentication mechanism will override each other."); + } + + for (Map.Entry<String, PrincipalDatabase> entry : databases.entrySet()) + { + // fixme As the database now provide the mechanisms they support, they will ... + // overwrite each other in the map. There should only be one database per vhost. + // But currently we must have authentication before vhost definition. + initialiseAuthenticationMechanisms(providerMap, entry.getValue()); + } + } + + private void initialiseAuthenticationMechanisms(Map<String, Class<? extends SaslServerFactory>> providerMap, PrincipalDatabase database) throws Exception { if (database == null || database.getMechanisms().size() == 0) { @@ -227,6 +152,7 @@ public class PrincipalDatabaseAuthenticationManager implements AuthenticationMan private void initialiseAuthenticationMechanism(String mechanism, AuthenticationProviderInitialiser initialiser, Map<String, Class<? extends SaslServerFactory>> providerMap) + throws Exception { if (_mechanisms == null) { @@ -247,217 +173,65 @@ public class PrincipalDatabaseAuthenticationManager implements AuthenticationMan _logger.info("Initialised " + mechanism + " SASL provider successfully"); } - /** - * @see org.apache.qpid.server.plugins.Plugin#configure(org.apache.qpid.server.configuration.plugins.ConfigurationPlugin) - */ - public void configure(final ConfigurationPlugin config) throws ConfigurationException - { - final PrincipalDatabaseAuthenticationManagerConfiguration pdamConfig = (PrincipalDatabaseAuthenticationManagerConfiguration) config; - final String pdClazz = pdamConfig.getPrincipalDatabaseClass(); - - _logger.info("PrincipalDatabase concrete implementation : " + pdClazz); - - _principalDatabase = createPrincipalDatabaseImpl(pdClazz); - - configPrincipalDatabase(_principalDatabase, pdamConfig); - } - public String getMechanisms() { - return _mechanisms; - } - - public SaslServer createSaslServer(String mechanism, String localFQDN) throws SaslException - { - return Sasl.createSaslServer(mechanism, "AMQP", localFQDN, _serverCreationProperties.get(mechanism), - _callbackHandlerMap.get(mechanism)); - } - - /** - * @see org.apache.qpid.server.security.auth.manager.AuthenticationManager#authenticate(SaslServer, byte[]) - */ - public AuthenticationResult authenticate(SaslServer server, byte[] response) - { - try + if (_default != null) { - // Process response from the client - byte[] challenge = server.evaluateResponse(response != null ? response : new byte[0]); - - if (server.isComplete()) - { - final Subject subject = new Subject(); - subject.getPrincipals().add(new UsernamePrincipal(server.getAuthorizationID())); - return new AuthenticationResult(subject); - } - else - { - return new AuthenticationResult(challenge, AuthenticationResult.AuthenticationStatus.CONTINUE); - } + // Use the default AuthenticationManager if present + return _default.getMechanisms(); } - catch (SaslException e) + else { - return new AuthenticationResult(AuthenticationResult.AuthenticationStatus.ERROR, e); + return _mechanisms; } } - /** - * @see org.apache.qpid.server.security.auth.manager.AuthenticationManager#authenticate(String, String) - */ - public AuthenticationResult authenticate(final String username, final String password) + public SaslServer createSaslServer(String mechanism, String localFQDN) throws SaslException { - try + if (_default != null) { - if (_principalDatabase.verifyPassword(username, password.toCharArray())) - { - final Subject subject = new Subject(); - subject.getPrincipals().add(new UsernamePrincipal(username)); - return new AuthenticationResult(subject); - } - else - { - return new AuthenticationResult(AuthenticationStatus.CONTINUE); - } + // Use the default AuthenticationManager if present + return _default.createSaslServer(mechanism, localFQDN); } - catch (AccountNotFoundException e) + else { - return new AuthenticationResult(AuthenticationStatus.CONTINUE); + return Sasl.createSaslServer(mechanism, "AMQP", localFQDN, _serverCreationProperties.get(mechanism), + _callbackHandlerMap.get(mechanism)); } - } - - public void close() - { - _mechanisms = null; - Security.removeProvider(PROVIDER_NAME); - unregisterManagement(); } - private PrincipalDatabase createPrincipalDatabaseImpl(final String pdClazz) throws ConfigurationException + public AuthenticationResult authenticate(SaslServer server, byte[] response) { - try - { - return (PrincipalDatabase) Class.forName(pdClazz).newInstance(); - } - catch (InstantiationException ie) - { - throw new ConfigurationException("Cannot instantiate " + pdClazz, ie); - } - catch (IllegalAccessException iae) + // Use the default AuthenticationManager if present + if (_default != null) { - throw new ConfigurationException("Cannot access " + pdClazz, iae); + return _default.authenticate(server, response); } - catch (ClassNotFoundException cnfe) - { - throw new ConfigurationException("Cannot load " + pdClazz + " implementation", cnfe); - } - catch (ClassCastException cce) - { - throw new ConfigurationException("Expecting a " + PrincipalDatabase.class + " implementation", cce); - } - } - private void configPrincipalDatabase(final PrincipalDatabase principalDatabase, final PrincipalDatabaseAuthenticationManagerConfiguration config) - throws ConfigurationException - { - final Map<String,String> attributes = config.getPdClassAttributeMap(); - - for (Iterator<Entry<String, String>> iterator = attributes.entrySet().iterator(); iterator.hasNext();) + try { - final Entry<String, String> nameValuePair = iterator.next(); - final String methodName = generateSetterName(nameValuePair.getKey()); - final Method method; - try - { - method = principalDatabase.getClass().getMethod(methodName, String.class); - } - catch (Exception e) - { - throw new ConfigurationException("No method " + methodName + " found in class " - + principalDatabase.getClass() - + " hence unable to configure principal database. The method must be public and " - + "have a single String argument with a void return type", e); - } - try - { - method.invoke(principalDatabase, PropertyUtils.replaceProperties(nameValuePair.getValue())); - } - catch (IllegalArgumentException e) - { - throw new ConfigurationException(e.getMessage(), e); - } - catch (PropertyException e) - { - throw new ConfigurationException(e.getMessage(), e); - } - catch (IllegalAccessException e) + // Process response from the client + byte[] challenge = server.evaluateResponse(response != null ? response : new byte[0]); + + if (server.isComplete()) { - throw new ConfigurationException(e.getMessage(), e); + return new AuthenticationResult(challenge, AuthenticationResult.AuthenticationStatus.SUCCESS); } - catch (InvocationTargetException e) + else { - // QPID-1347.. InvocationTargetException wraps the checked exception thrown from the reflective - // method call. Pull out the underlying message and cause to make these more apparent to the user. - throw new ConfigurationException(e.getCause().getMessage(), e.getCause()); + return new AuthenticationResult(challenge, AuthenticationResult.AuthenticationStatus.CONTINUE); } } - } - - private String generateSetterName(String argName) throws ConfigurationException - { - if ((argName == null) || (argName.length() == 0)) - { - throw new ConfigurationException("Argument names must have length >= 1 character"); - } - - if (Character.isLowerCase(argName.charAt(0))) - { - argName = Character.toUpperCase(argName.charAt(0)) + argName.substring(1); - } - - final String methodName = "set" + argName; - return methodName; - } - - protected void setPrincipalDatabase(final PrincipalDatabase principalDatabase) - { - _principalDatabase = principalDatabase; - } - - protected void registerManagement() - { - try - { - _logger.info("Registering UserManagementMBean"); - - _mbean = new AMQUserManagementMBean(); - _mbean.setPrincipalDatabase(_principalDatabase); - _mbean.register(); - } - catch (Exception e) + catch (SaslException e) { - _logger.warn("User management disabled as unable to create MBean:", e); - _mbean = null; + return new AuthenticationResult(AuthenticationResult.AuthenticationStatus.ERROR, e); } } - protected void unregisterManagement() + public void close() { - try - { - if (_mbean != null) - { - _logger.info("Unregistering UserManagementMBean"); - _mbean.unregister(); - } - } - catch (Exception e) - { - _logger.warn("Failed to unregister User management MBean:", e); - } - finally - { - _mbean = null; - } + Security.removeProvider(PROVIDER_NAME); } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/rmi/RMIPasswordAuthenticator.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/rmi/RMIPasswordAuthenticator.java index b7985ad972..0cbbccb3b8 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/rmi/RMIPasswordAuthenticator.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/rmi/RMIPasswordAuthenticator.java @@ -20,13 +20,14 @@ */ package org.apache.qpid.server.security.auth.rmi; +import java.util.Collections; + import javax.management.remote.JMXAuthenticator; import javax.management.remote.JMXPrincipal; import javax.security.auth.Subject; +import javax.security.auth.login.AccountNotFoundException; -import org.apache.qpid.server.security.auth.AuthenticationResult; -import org.apache.qpid.server.security.auth.AuthenticationResult.AuthenticationStatus; -import org.apache.qpid.server.security.auth.manager.AuthenticationManager; +import org.apache.qpid.server.security.auth.database.PrincipalDatabase; public class RMIPasswordAuthenticator implements JMXAuthenticator { @@ -38,15 +39,15 @@ public class RMIPasswordAuthenticator implements JMXAuthenticator static final String CREDENTIALS_REQUIRED = "User details are required. " + "Please ensure you are using an up to date management console to connect."; - private AuthenticationManager _authenticationManager = null; + private PrincipalDatabase _db = null; public RMIPasswordAuthenticator() { } - - public void setAuthenticationManager(final AuthenticationManager authenticationManager) + + public void setPrincipalDatabase(PrincipalDatabase pd) { - _authenticationManager = authenticationManager; + this._db = pd; } public Subject authenticate(Object credentials) throws SecurityException @@ -64,39 +65,50 @@ public class RMIPasswordAuthenticator implements JMXAuthenticator } } - // Verify that required number of credentials. + // Verify that required number of credential's. final String[] userCredentials = (String[]) credentials; if (userCredentials.length != 2) { throw new SecurityException(SHOULD_HAVE_2_ELEMENTS); } - final String username = (String) userCredentials[0]; - final String password = (String) userCredentials[1]; + String username = (String) userCredentials[0]; + String password = (String) userCredentials[1]; - // Verify that all required credentials are actually present. + // Verify that all required credential's are actually present. if (username == null || password == null) { throw new SecurityException(SHOULD_BE_NON_NULL); } - // Verify that an AuthenticationManager has been set. - if (_authenticationManager == null) + // Verify that a PD has been set. + if (_db == null) { throw new SecurityException(UNABLE_TO_LOOKUP); } - final AuthenticationResult result = _authenticationManager.authenticate(username, password); + + boolean authenticated = false; - if (AuthenticationStatus.ERROR.equals(result.getStatus())) + // Perform authentication + try { - throw new SecurityException("Authentication manager failed", result.getCause()); + if (_db.verifyPassword(username, password.toCharArray())) + { + authenticated = true; + } + } + catch (AccountNotFoundException e) + { + throw new SecurityException(INVALID_CREDENTIALS); // XXX } - else if (AuthenticationStatus.SUCCESS.equals(result.getStatus())) + + if (authenticated) { - final Subject subject = result.getSubject(); - subject.getPrincipals().add(new JMXPrincipal(username)); - subject.setReadOnly(); - return subject; + //credential's check out, return the appropriate JAAS Subject + return new Subject(true, + Collections.singleton(new JMXPrincipal(username)), + Collections.EMPTY_SET, + Collections.EMPTY_SET); } else { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/AuthenticationProviderInitialiser.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/AuthenticationProviderInitialiser.java index bc5d8a4f2b..89e545d6f5 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/AuthenticationProviderInitialiser.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/AuthenticationProviderInitialiser.java @@ -25,6 +25,9 @@ import java.util.Map; import javax.security.auth.callback.CallbackHandler; import javax.security.sasl.SaslServerFactory; +import org.apache.commons.configuration.Configuration; +import org.apache.qpid.server.security.auth.database.PrincipalDatabase; + public interface AuthenticationProviderInitialiser { /** @@ -34,6 +37,24 @@ public interface AuthenticationProviderInitialiser String getMechanismName(); /** + * Initialise the authentication provider. + * @param baseConfigPath the path in the config file that points to any config options for this provider. Each + * provider can have its own set of configuration options + * @param configuration the Apache Commons Configuration instance used to configure this provider + * @param principalDatabases the set of principal databases that are available + * @throws Exception needs refined Exception is too broad. + */ + void initialise(String baseConfigPath, Configuration configuration, + Map<String, PrincipalDatabase> principalDatabases) throws Exception; + + /** + * Initialise the authentication provider. + * @param db The principal database to initialise with + */ + void initialise(PrincipalDatabase db); + + + /** * @return the callback handler that should be used to process authentication requests for this mechanism. This will * be called after initialise and will be stored by the authentication manager. The callback handler <b>must</b> be * fully threadsafe. diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/GroupPrincipal.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/GroupPrincipal.java deleted file mode 100644 index 30a503c769..0000000000 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/GroupPrincipal.java +++ /dev/null @@ -1,99 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.security.auth.sasl; - -import java.security.Principal; -import java.security.acl.Group; -import java.util.Enumeration; - -/** - * Immutable representation of a user group. In Qpid, groups do <b>not</b> know - * about their membership, and therefore the {@link #addMember(Principal)} - * methods etc throw {@link UnsupportedOperationException}. - * - */ -public class GroupPrincipal implements Group -{ - /** Name of the group */ - private final String _groupName; - - public GroupPrincipal(final String groupName) - { - _groupName = groupName; - } - - public String getName() - { - return _groupName; - } - - public boolean addMember(Principal user) - { - throw new UnsupportedOperationException("Not supported"); - } - - public boolean removeMember(Principal user) - { - throw new UnsupportedOperationException("Not supported"); - } - - public boolean isMember(Principal member) - { - throw new UnsupportedOperationException("Not supported"); - } - - public Enumeration<? extends Principal> members() - { - throw new UnsupportedOperationException("Not supported"); - } - - /** - * @see java.lang.Object#hashCode() - */ - public int hashCode() - { - final int prime = 37; - return prime * _groupName.hashCode(); - } - - /** - * @see java.lang.Object#equals(java.lang.Object) - */ - public boolean equals(Object obj) - { - if (this == obj) - { - return true; - } - else - { - if (obj instanceof GroupPrincipal) - { - GroupPrincipal other = (GroupPrincipal) obj; - return _groupName.equals(other._groupName); - } - else - { - return false; - } - } - } -} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/UsernamePrincipal.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/UsernamePrincipal.java index b4ee13fe6b..d7c8383690 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/UsernamePrincipal.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/UsernamePrincipal.java @@ -21,21 +21,14 @@ package org.apache.qpid.server.security.auth.sasl; import java.security.Principal; -import java.util.Set; - -import javax.security.auth.Subject; /** A principal that is just a wrapper for a simple username. */ public class UsernamePrincipal implements Principal { - private final String _name; + private String _name; public UsernamePrincipal(String name) { - if (name == null) - { - throw new IllegalArgumentException("name cannot be null"); - } _name = name; } @@ -48,53 +41,4 @@ public class UsernamePrincipal implements Principal { return _name; } - - /** - * @see java.lang.Object#hashCode() - */ - @Override - public int hashCode() - { - final int prime = 31; - return prime * _name.hashCode(); - } - - /** - * @see java.lang.Object#equals(java.lang.Object) - */ - @Override - public boolean equals(Object obj) - { - if (this == obj) - { - return true; - } - else - { - if (obj instanceof UsernamePrincipal) - { - UsernamePrincipal other = (UsernamePrincipal) obj; - return _name.equals(other._name); - } - else - { - return false; - } - } - } - - public static UsernamePrincipal getUsernamePrincipalFromSubject(final Subject authSubject) - { - if (authSubject == null) - { - throw new IllegalArgumentException("No authenticated subject."); - } - - final Set<UsernamePrincipal> principals = authSubject.getPrincipals(UsernamePrincipal.class); - if (principals.size() != 1) - { - throw new IllegalArgumentException("Can't find single UsernamePrincipal in authenticated subject"); - } - return principals.iterator().next(); - } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainSaslServerFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainSaslServerFactory.java index 17d123eb0d..67d20136bf 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainSaslServerFactory.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainSaslServerFactory.java @@ -45,10 +45,9 @@ public class AmqPlainSaslServerFactory implements SaslServerFactory public String[] getMechanismNames(Map props) { - if (props != null && - (props.containsKey(Sasl.POLICY_NOPLAINTEXT) || - props.containsKey(Sasl.POLICY_NODICTIONARY) || - props.containsKey(Sasl.POLICY_NOACTIVE))) + if (props.containsKey(Sasl.POLICY_NOPLAINTEXT) || + props.containsKey(Sasl.POLICY_NODICTIONARY) || + props.containsKey(Sasl.POLICY_NOACTIVE)) { // returned array must be non null according to interface documentation return new String[0]; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServerFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServerFactory.java index 8a5ff7df2d..6032255870 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServerFactory.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServerFactory.java @@ -47,11 +47,10 @@ public class AnonymousSaslServerFactory implements SaslServerFactory public String[] getMechanismNames(Map props) { - if (props != null && - (props.containsKey(Sasl.POLICY_NOPLAINTEXT) || - props.containsKey(Sasl.POLICY_NODICTIONARY) || - props.containsKey(Sasl.POLICY_NOACTIVE) || - props.containsKey(Sasl.POLICY_NOANONYMOUS))) + if (props.containsKey(Sasl.POLICY_NOPLAINTEXT) || + props.containsKey(Sasl.POLICY_NODICTIONARY) || + props.containsKey(Sasl.POLICY_NOACTIVE) || + props.containsKey(Sasl.POLICY_NOANONYMOUS)) { // returned array must be non null according to interface documentation return new String[0]; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/crammd5/CRAMMD5HexInitialiser.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/crammd5/CRAMMD5HexInitialiser.java index 139818735f..8020d97364 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/crammd5/CRAMMD5HexInitialiser.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/crammd5/CRAMMD5HexInitialiser.java @@ -70,7 +70,7 @@ public class CRAMMD5HexInitialiser extends UsernamePasswordInitialiser for (char c : password) { //toHexString does not prepend 0 so we have to - if (((byte) c > -1) && (byte) c < 0x10 ) + if (((byte) c > -1) && (byte) c < 10) { sb.append(0); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/plain/PlainSaslServerFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/plain/PlainSaslServerFactory.java index 3144bfbce6..f0dd9eeb6d 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/plain/PlainSaslServerFactory.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/plain/PlainSaslServerFactory.java @@ -45,10 +45,9 @@ public class PlainSaslServerFactory implements SaslServerFactory public String[] getMechanismNames(Map props) { - if (props != null && - (props.containsKey(Sasl.POLICY_NOPLAINTEXT) || - props.containsKey(Sasl.POLICY_NODICTIONARY) || - props.containsKey(Sasl.POLICY_NOACTIVE))) + if (props.containsKey(Sasl.POLICY_NOPLAINTEXT) || + props.containsKey(Sasl.POLICY_NODICTIONARY) || + props.containsKey(Sasl.POLICY_NOACTIVE)) { // returned array must be non null according to interface documentation return new String[0]; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java index 33aebffcfb..6cc5e7b019 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java @@ -259,7 +259,7 @@ public class AMQStateManager implements AMQMethodListener public AMQProtocolSession getProtocolSession() { - SecurityManager.setThreadSubject(_protocolSession.getAuthorizedSubject()); + SecurityManager.setThreadPrincipal(_protocolSession.getPrincipal()); return _protocolSession; } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/stats/StatisticsCounter.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/stats/StatisticsCounter.java deleted file mode 100644 index b732121180..0000000000 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/stats/StatisticsCounter.java +++ /dev/null @@ -1,163 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpid.server.stats; - -import java.util.Date; -import java.util.concurrent.atomic.AtomicLong; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * This class collects statistics and counts the total, rate per second and - * peak rate per second values for the events that are registered with it. - */ -public class StatisticsCounter -{ - private static final Logger _log = LoggerFactory.getLogger(StatisticsCounter.class); - - public static final long DEFAULT_SAMPLE_PERIOD = Long.getLong("qpid.statistics.samplePeriod", 2000L); // 2s - public static final boolean DISABLE_STATISTICS = Boolean.getBoolean("qpid.statistics.disable"); - - private static final String COUNTER = "counter"; - private static final AtomicLong _counterIds = new AtomicLong(0L); - - private long _peak = 0L; - private long _total = 0L; - private long _temp = 0L; - private long _last = 0L; - private long _rate = 0L; - - private long _start; - - private final long _period; - private final String _name; - - public StatisticsCounter() - { - this(COUNTER); - } - - public StatisticsCounter(String name) - { - this(name, DEFAULT_SAMPLE_PERIOD); - } - - public StatisticsCounter(String name, long period) - { - _period = period; - _name = name + "-" + + _counterIds.incrementAndGet(); - reset(); - } - - public void registerEvent() - { - registerEvent(1L); - } - - public void registerEvent(long value) - { - registerEvent(value, System.currentTimeMillis()); - } - - public void registerEvent(long value, long timestamp) - { - if (DISABLE_STATISTICS) - { - return; - } - - long thisSample = (timestamp / _period); - synchronized (this) - { - if (thisSample > _last) - { - _last = thisSample; - _rate = _temp; - _temp = 0L; - if (_rate > _peak) - { - _peak = _rate; - } - } - - _total += value; - _temp += value; - } - } - - /** - * Update the current rate and peak - may reset rate to zero if a new - * sample period has started. - */ - private void update() - { - registerEvent(0L, System.currentTimeMillis()); - } - - /** - * Reset - */ - public void reset() - { - _log.info("Resetting statistics for counter: " + _name); - _peak = 0L; - _rate = 0L; - _total = 0L; - _start = System.currentTimeMillis(); - _last = _start / _period; - } - - public double getPeak() - { - update(); - return (double) _peak / ((double) _period / 1000.0d); - } - - public double getRate() - { - update(); - return (double) _rate / ((double) _period / 1000.0d); - } - - public long getTotal() - { - return _total; - } - - public long getStart() - { - return _start; - } - - public Date getStartTime() - { - return new Date(_start); - } - - public String getName() - { - return _name; - } - - public long getPeriod() - { - return _period; - } -} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/stats/StatisticsGatherer.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/stats/StatisticsGatherer.java deleted file mode 100644 index 36fec4025a..0000000000 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/stats/StatisticsGatherer.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpid.server.stats; - -/** - * This interface is to be implemented by any broker business object that - * wishes to gather statistics about messages delivered through it. - * - * These statistics are exposed using a separate JMX Mbean interface, which - * calls these methods to retrieve the underlying {@link StatisticsCounter}s - * and return their attributes. This interface gives a standard way for - * parts of the broker to set up and configure statistics generation. - * <p> - * When creating these objects, there should be a parent/child relationship - * between them, such that the lowest level gatherer can record staticics if - * enabled, and pass on the notification to the parent object to allow higher - * level aggregation. When resetting statistics, this works in the opposite - * direction, with higher level gatherers also resetting all of their children. - */ -public interface StatisticsGatherer -{ - /** - * Initialise the statistics gathering for this object. - * - * This method is responsible for creating any {@link StatisticsCounter} - * objects and for determining whether statistics generation should be - * enabled, by checking broker and system configuration. - * - * @see StatisticsCounter#DISABLE_STATISTICS - */ - void initialiseStatistics(); - - /** - * This method is responsible for registering the receipt of a message - * with the counters, and also for passing this notification to any parent - * {@link StatisticsGatherer}s. If statistics generation is not enabled, - * then this method should simple delegate to the parent gatherer. - * - * @param messageSize the size in bytes of the delivered message - * @param timestamp the time the message was delivered - */ - void registerMessageReceived(long messageSize, long timestamp); - - /** - * This method is responsible for registering the delivery of a message - * with the counters. Message delivery is recorded by the counter using - * the current system time, as opposed to the message timestamp. - * - * @param messageSize the size in bytes of the delivered message - * @see #registerMessageReceived(long, long) - */ - void registerMessageDelivered(long messageSize); - - /** - * Gives access to the {@link StatisticsCounter} that is used to count - * delivered message statistics. - * - * @return the {@link StatisticsCounter} that counts delivered messages - */ - StatisticsCounter getMessageDeliveryStatistics(); - - /** - * Gives access to the {@link StatisticsCounter} that is used to count - * received message statistics. - * - * @return the {@link StatisticsCounter} that counts received messages - */ - StatisticsCounter getMessageReceiptStatistics(); - - /** - * Gives access to the {@link StatisticsCounter} that is used to count - * delivered message size statistics. - * - * @return the {@link StatisticsCounter} that counts delivered bytes - */ - StatisticsCounter getDataDeliveryStatistics(); - - /** - * Gives access to the {@link StatisticsCounter} that is used to count - * received message size statistics. - * - * @return the {@link StatisticsCounter} that counts received bytes - */ - StatisticsCounter getDataReceiptStatistics(); - - /** - * Reset the counters for this, and any child {@link StatisticsGatherer}s. - */ - void resetStatistics(); - - /** - * Check if this object has statistics generation enabled. - * - * @return true if statistics generation is enabled - */ - boolean isStatisticsEnabled(); - - /** - * Enable or disable statistics generation for this object. - */ - void setStatisticsEnabled(boolean enabled); -} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionList.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionList.java index 3e6299cb8a..9ea81660c6 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionList.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionList.java @@ -20,108 +20,121 @@ */ package org.apache.qpid.server.subscription; +import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.subscription.Subscription; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.nio.ByteBuffer; public class SubscriptionList { + private final SubscriptionNode _head = new SubscriptionNode(); - private final AtomicReference<SubscriptionNode> _tail = new AtomicReference<SubscriptionNode>(_head); - private final AtomicReference<SubscriptionNode> _subNodeMarker = new AtomicReference<SubscriptionNode>(_head); - private final AtomicInteger _size = new AtomicInteger(); + private AtomicReference<SubscriptionNode> _tail = new AtomicReference<SubscriptionNode>(_head); + private AtomicInteger _size = new AtomicInteger(); + - public static final class SubscriptionNode + public final class SubscriptionNode { private final AtomicBoolean _deleted = new AtomicBoolean(); private final AtomicReference<SubscriptionNode> _next = new AtomicReference<SubscriptionNode>(); private final Subscription _sub; + public SubscriptionNode() { - //used for sentinel head and dummy node construction + _sub = null; _deleted.set(true); } public SubscriptionNode(final Subscription sub) { - //used for regular node construction _sub = sub; } - /** - * Retrieves the first non-deleted node following the current node. - * Any deleted non-tail nodes encountered during the search are unlinked. - * - * @return the next non-deleted node, or null if none was found. - */ - public SubscriptionNode findNext() + + public SubscriptionNode getNext() { + SubscriptionNode next = nextNode(); while(next != null && next.isDeleted()) { + final SubscriptionNode newNext = next.nextNode(); if(newNext != null) { - //try to move our _next reference forward to the 'newNext' - //node to unlink the deleted node _next.compareAndSet(next, newNext); next = nextNode(); } else { - //'newNext' is null, meaning 'next' is the current tail. Can't unlink - //the tail node for thread safety reasons, just use the null. next = null; } - } + } return next; } - /** - * Gets the immediately next referenced node in the structure. - * - * @return the immediately next node in the structure, or null if at the tail. - */ - protected SubscriptionNode nextNode() + private SubscriptionNode nextNode() { return _next.get(); } - /** - * Used to initialise the 'next' reference. Will only succeed if the reference was not previously set. - * - * @param node the SubscriptionNode to set as 'next' - * @return whether the operation succeeded - */ - private boolean setNext(final SubscriptionNode node) - { - return _next.compareAndSet(null, node); - } - public boolean isDeleted() { return _deleted.get(); } + public boolean delete() { - return _deleted.compareAndSet(false,true); + if(_deleted.compareAndSet(false,true)) + { + _size.decrementAndGet(); + advanceHead(); + return true; + } + else + { + return false; + } } + public Subscription getSubscription() { return _sub; } } - private void insert(final SubscriptionNode node, final boolean count) + + public SubscriptionList(AMQQueue queue) + { + } + + private void advanceHead() + { + SubscriptionNode head = _head.nextNode(); + while(head._next.get() != null && head.isDeleted()) + { + + final SubscriptionNode newhead = head.nextNode(); + if(newhead != null) + { + _head._next.compareAndSet(head, newhead); + } + head = _head.nextNode(); + } + } + + + public SubscriptionNode add(Subscription sub) { + SubscriptionNode node = new SubscriptionNode(sub); for (;;) { SubscriptionNode tail = _tail.get(); @@ -130,14 +143,11 @@ public class SubscriptionList { if (next == null) { - if (tail.setNext(node)) + if (tail._next.compareAndSet(null, node)) { _tail.compareAndSet(tail, node); - if(count) - { - _size.incrementAndGet(); - } - return; + _size.incrementAndGet(); + return node; } } else @@ -146,101 +156,27 @@ public class SubscriptionList } } } - } - public void add(final Subscription sub) - { - SubscriptionNode node = new SubscriptionNode(sub); - insert(node, true); } - public boolean remove(final Subscription sub) + public boolean remove(Subscription sub) { - SubscriptionNode prevNode = _head; - SubscriptionNode node = _head.nextNode(); - + SubscriptionNode node = _head.getNext(); while(node != null) { - if(sub.equals(node.getSubscription()) && node.delete()) + if(sub.equals(node._sub) && node.delete()) { - _size.decrementAndGet(); - - SubscriptionNode tail = _tail.get(); - if(node == tail) - { - //we cant remove the last node from the structure for - //correctness reasons, however we have just 'deleted' - //the tail. Inserting an empty dummy node after it will - //let us scavenge the node containing the Subscription. - insert(new SubscriptionNode(), false); - } - - //advance the next node reference in the 'prevNode' to scavange - //the newly 'deleted' node for the Subscription. - prevNode.findNext(); - - nodeMarkerCleanup(node); - return true; } - - prevNode = node; - node = node.findNext(); + node = node.getNext(); } - return false; } - private void nodeMarkerCleanup(final SubscriptionNode node) - { - SubscriptionNode markedNode = _subNodeMarker.get(); - if(node == markedNode) - { - //if the marked node is the one we are removing, then - //replace it with a dummy pointing at the next node. - //this is OK as the marked node is only used to index - //into the list and find the next node to use. - //Because we inserted a dummy if node was the - //tail, markedNode.nextNode() can never be null. - SubscriptionNode dummy = new SubscriptionNode(); - dummy.setNext(markedNode.nextNode()); - - //if the CAS fails the marked node has changed, thus - //we don't care about the dummy and just forget it - _subNodeMarker.compareAndSet(markedNode, dummy); - } - else if(markedNode != null) - { - //if the marked node was already deleted then it could - //hold subsequently removed nodes after it in the list - //in memory. Scavenge it to ensure their actual removal. - if(markedNode != _head && markedNode.isDeleted()) - { - markedNode.findNext(); - } - } - } - - public boolean updateMarkedNode(final SubscriptionNode expected, final SubscriptionNode nextNode) - { - return _subNodeMarker.compareAndSet(expected, nextNode); - } - - /** - * Get the current marked SubscriptionNode. This should only be used only to index into the list and find the next node - * after the mark, since if the previously marked node was subsequently deleted the item returned may be a dummy node - * with reference to the next node. - * - * @return the previously marked node (or a dummy if it was subsequently deleted) - */ - public SubscriptionNode getMarkedNode() - { - return _subNodeMarker.get(); - } - public static class SubscriptionNodeIterator { + private SubscriptionNode _lastNode; SubscriptionNodeIterator(SubscriptionNode startNode) @@ -248,25 +184,49 @@ public class SubscriptionList _lastNode = startNode; } + + public boolean atTail() + { + return _lastNode.nextNode() == null; + } + public SubscriptionNode getNode() { + return _lastNode; + } public boolean advance() { - SubscriptionNode nextNode = _lastNode.findNext(); - _lastNode = nextNode; - return _lastNode != null; + if(!atTail()) + { + SubscriptionNode nextNode = _lastNode.nextNode(); + while(nextNode.isDeleted() && nextNode.nextNode() != null) + { + nextNode = nextNode.nextNode(); + } + _lastNode = nextNode; + return true; + + } + else + { + return false; + } + } + } + public SubscriptionNodeIterator iterator() { return new SubscriptionNodeIterator(_head); } + public SubscriptionNode getHead() { return _head; @@ -276,6 +236,9 @@ public class SubscriptionList { return _size.get(); } + + + } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java index 68e47fd86a..b36ac84cdd 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java @@ -97,6 +97,7 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr private FlowCreditManager_0_10 _creditManager; + private StateListener _stateListener = new StateListener() { @@ -441,7 +442,7 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr Struct[] headers = new Struct[] { deliveryProps, messageProps }; BasicContentHeaderProperties properties = - (BasicContentHeaderProperties) message_0_8.getContentHeaderBody().getProperties(); + (BasicContentHeaderProperties) message_0_8.getContentHeaderBody().properties; final AMQShortString exchange = message_0_8.getMessagePublishInfo().getExchange(); if(exchange != null) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/QpidAcceptor.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/QpidAcceptor.java index abbc5a3805..3ca22b60c8 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/QpidAcceptor.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/QpidAcceptor.java @@ -20,21 +20,21 @@ */ package org.apache.qpid.server.transport; -import org.apache.qpid.transport.network.NetworkTransport; +import org.apache.qpid.transport.NetworkDriver; public class QpidAcceptor { - NetworkTransport _transport; + NetworkDriver _driver; String _protocol; - public QpidAcceptor(NetworkTransport transport, String protocol) + public QpidAcceptor(NetworkDriver driver, String protocol) { - _transport = transport; + _driver = driver; _protocol = protocol; } - public NetworkTransport getNetworkTransport() + public NetworkDriver getNetworkDriver() { - return _transport; + return _driver; } public String toString() diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java index 9b3673e8b7..d2addfde0c 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java @@ -22,14 +22,8 @@ package org.apache.qpid.server.transport; import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.*; -import java.security.Principal; import java.text.MessageFormat; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.ArrayList; -import java.util.List; -import java.util.UUID; - -import javax.security.auth.Subject; import org.apache.qpid.AMQException; import org.apache.qpid.protocol.AMQConstant; @@ -41,40 +35,25 @@ import org.apache.qpid.server.logging.actors.GenericActor; import org.apache.qpid.server.logging.messages.ConnectionMessages; import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.AMQSessionModel; -import org.apache.qpid.server.security.AuthorizationHolder; -import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal; -import org.apache.qpid.server.stats.StatisticsCounter; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.transport.Connection; -import org.apache.qpid.transport.ConnectionCloseCode; import org.apache.qpid.transport.ExecutionErrorCode; import org.apache.qpid.transport.ExecutionException; import org.apache.qpid.transport.Method; import org.apache.qpid.transport.ProtocolEvent; -import org.apache.qpid.transport.Session; -public class ServerConnection extends Connection implements AMQConnectionModel, LogSubject, AuthorizationHolder +public class ServerConnection extends Connection implements AMQConnectionModel, LogSubject { private ConnectionConfig _config; private Runnable _onOpenTask; private AtomicBoolean _logClosed = new AtomicBoolean(false); private LogActor _actor = GenericActor.getInstance(this); - private Subject _authorizedSubject = null; - private Principal _authorizedPrincipal = null; - private boolean _statisticsEnabled = false; - private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived; - public ServerConnection() { } - public UUID getId() - { - return _config.getId(); - } - @Override protected void invoke(Method method) { @@ -93,18 +72,8 @@ public class ServerConnection extends Connection implements AMQConnectionModel, _onOpenTask.run(); } _actor.message(ConnectionMessages.OPEN(getClientId(), "0-10", true, true)); - - getVirtualHost().getConnectionRegistry().registerConnection(this); - } - - if (state == State.CLOSE_RCVD || state == State.CLOSED || state == State.CLOSING) - { - if(_virtualHost != null) - { - _virtualHost.getConnectionRegistry().deregisterConnection(this); - } } - + if (state == State.CLOSED) { logClosed(); @@ -141,8 +110,6 @@ public class ServerConnection extends Connection implements AMQConnectionModel, public void setVirtualHost(VirtualHost virtualHost) { _virtualHost = virtualHost; - - initialiseStatistics(); } public void setConnectionConfig(final ConnectionConfig config) @@ -178,11 +145,6 @@ public class ServerConnection extends Connection implements AMQConnectionModel, ((ServerSession)session).close(); } - - public LogSubject getLogSubject() - { - return (LogSubject) this; - } @Override public void received(ProtocolEvent event) @@ -217,9 +179,9 @@ public class ServerConnection extends Connection implements AMQConnectionModel, public String toLogString() { boolean hasVirtualHost = (null != this.getVirtualHost()); - boolean hasClientId = (null != getClientId()); + boolean hasPrincipal = (null != getAuthorizationID()); - if (hasClientId && hasVirtualHost) + if (hasPrincipal && hasVirtualHost) { return "[" + MessageFormat.format(CONNECTION_FORMAT, @@ -229,7 +191,7 @@ public class ServerConnection extends Connection implements AMQConnectionModel, getVirtualHost().getName()) + "] "; } - else if (hasClientId) + else if (hasPrincipal) { return "[" + MessageFormat.format(USER_FORMAT, @@ -253,130 +215,4 @@ public class ServerConnection extends Connection implements AMQConnectionModel, { return _actor; } - - public void close(AMQConstant cause, String message) throws AMQException - { - ConnectionCloseCode replyCode = ConnectionCloseCode.NORMAL; - try - { - replyCode = ConnectionCloseCode.get(cause.getCode()); - } - catch (IllegalArgumentException iae) - { - // Ignore - } - close(replyCode, message); - } - - public List<AMQSessionModel> getSessionModels() - { - List<AMQSessionModel> sessions = new ArrayList<AMQSessionModel>(); - for (Session ssn : getChannels()) - { - sessions.add((AMQSessionModel) ssn); - } - return sessions; - } - - public void registerMessageDelivered(long messageSize) - { - if (isStatisticsEnabled()) - { - _messagesDelivered.registerEvent(1L); - _dataDelivered.registerEvent(messageSize); - } - _virtualHost.registerMessageDelivered(messageSize); - } - - public void registerMessageReceived(long messageSize, long timestamp) - { - if (isStatisticsEnabled()) - { - _messagesReceived.registerEvent(1L, timestamp); - _dataReceived.registerEvent(messageSize, timestamp); - } - _virtualHost.registerMessageReceived(messageSize, timestamp); - } - - public StatisticsCounter getMessageReceiptStatistics() - { - return _messagesReceived; - } - - public StatisticsCounter getDataReceiptStatistics() - { - return _dataReceived; - } - - public StatisticsCounter getMessageDeliveryStatistics() - { - return _messagesDelivered; - } - - public StatisticsCounter getDataDeliveryStatistics() - { - return _dataDelivered; - } - - public void resetStatistics() - { - _messagesDelivered.reset(); - _dataDelivered.reset(); - _messagesReceived.reset(); - _dataReceived.reset(); - } - - public void initialiseStatistics() - { - setStatisticsEnabled(!StatisticsCounter.DISABLE_STATISTICS && - _virtualHost.getApplicationRegistry().getConfiguration().isStatisticsGenerationConnectionsEnabled()); - - _messagesDelivered = new StatisticsCounter("messages-delivered-" + getConnectionId()); - _dataDelivered = new StatisticsCounter("data-delivered-" + getConnectionId()); - _messagesReceived = new StatisticsCounter("messages-received-" + getConnectionId()); - _dataReceived = new StatisticsCounter("data-received-" + getConnectionId()); - } - - public boolean isStatisticsEnabled() - { - return _statisticsEnabled; - } - - public void setStatisticsEnabled(boolean enabled) - { - _statisticsEnabled = enabled; - } - - /** - * @return authorizedSubject - */ - public Subject getAuthorizedSubject() - { - return _authorizedSubject; - } - - /** - * Sets the authorized subject. It also extracts the UsernamePrincipal from the subject - * and caches it for optimisation purposes. - * - * @param authorizedSubject - */ - public void setAuthorizedSubject(final Subject authorizedSubject) - { - if (authorizedSubject == null) - { - _authorizedSubject = null; - _authorizedPrincipal = null; - } - else - { - _authorizedSubject = authorizedSubject; - _authorizedPrincipal = UsernamePrincipal.getUsernamePrincipalFromSubject(_authorizedSubject); - } - } - - public Principal getAuthorizedPrincipal() - { - return _authorizedPrincipal; - } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java index 27e199291d..2b9e92f685 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java @@ -20,30 +20,26 @@ */ package org.apache.qpid.server.transport; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.StringTokenizer; - -import javax.security.sasl.SaslException; -import javax.security.sasl.SaslServer; - +import org.apache.qpid.transport.*; +import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.actors.GenericActor; +import org.apache.qpid.common.ClientProperties; import org.apache.qpid.protocol.ProtocolEngine; +import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.registry.IApplicationRegistry; -import org.apache.qpid.server.security.SecurityManager; -import org.apache.qpid.server.security.auth.AuthenticationResult; -import org.apache.qpid.server.security.auth.AuthenticationResult.AuthenticationStatus; import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.transport.*; + +import javax.security.sasl.SaslServer; +import javax.security.sasl.SaslException; +import java.util.*; public class ServerConnectionDelegate extends ServerDelegate { private String _localFQDN; private final IApplicationRegistry _appRegistry; + public ServerConnectionDelegate(IApplicationRegistry appRegistry, String localFQDN) { this(new HashMap<String,Object>(Collections.singletonMap("qpid.federation_tag",appRegistry.getBroker().getFederationTag())), Collections.singletonList((Object)"en_US"), appRegistry, localFQDN); @@ -72,6 +68,7 @@ public class ServerConnectionDelegate extends ServerDelegate return list; } + @Override public ServerSession getSession(Connection conn, SessionAttach atc) { SessionDelegate serverSessionDelegate = new ServerSessionDelegate(_appRegistry); @@ -81,33 +78,14 @@ public class ServerConnectionDelegate extends ServerDelegate return ssn; } + @Override protected SaslServer createSaslServer(String mechanism) throws SaslException { return _appRegistry.getAuthenticationManager().createSaslServer(mechanism, _localFQDN); } - protected void secure(final SaslServer ss, final Connection conn, final byte[] response) - { - final AuthenticationResult authResult = _appRegistry.getAuthenticationManager().authenticate(ss, response); - final ServerConnection sconn = (ServerConnection) conn; - - - if (AuthenticationStatus.SUCCESS.equals(authResult.getStatus())) - { - tuneAuthorizedConnection(sconn); - sconn.setAuthorizedSubject(authResult.getSubject()); - } - else if (AuthenticationStatus.CONTINUE.equals(authResult.getStatus())) - { - connectionAuthContinue(sconn, authResult.getChallenge()); - } - else - { - connectionAuthFailed(sconn, authResult.getCause()); - } - } - + @Override public void connectionClose(Connection conn, ConnectionClose close) { try @@ -121,9 +99,10 @@ public class ServerConnectionDelegate extends ServerDelegate } + @Override public void connectionOpen(Connection conn, ConnectionOpen open) { - final ServerConnection sconn = (ServerConnection) conn; + ServerConnection sconn = (ServerConnection) conn; VirtualHost vhost; String vhostName; @@ -137,7 +116,7 @@ public class ServerConnectionDelegate extends ServerDelegate } vhost = _appRegistry.getVirtualHostRegistry().getVirtualHost(vhostName); - SecurityManager.setThreadSubject(sconn.getAuthorizedSubject()); + SecurityManager.setThreadPrincipal(conn.getAuthorizationID()); if(vhost != null) { @@ -159,7 +138,6 @@ public class ServerConnectionDelegate extends ServerDelegate sconn.invoke(new ConnectionClose(ConnectionCloseCode.INVALID_PATH, "Unknown virtualhost '"+vhostName+"'")); sconn.setState(Connection.State.CLOSING); } - } @Override diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java index e9168f71fb..540ad3fffd 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java @@ -23,25 +23,9 @@ package org.apache.qpid.server.transport; import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CHANNEL_FORMAT; import static org.apache.qpid.util.Serial.gt; -import java.lang.ref.WeakReference; -import java.security.Principal; -import java.text.MessageFormat; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.SortedMap; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentSkipListMap; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.atomic.AtomicLong; - -import javax.security.auth.Subject; +import com.sun.security.auth.UserPrincipal; import org.apache.qpid.AMQException; -import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.ProtocolEngine; import org.apache.qpid.server.configuration.ConfigStore; import org.apache.qpid.server.configuration.ConfiguredObject; @@ -54,18 +38,18 @@ import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.actors.GenericActor; import org.apache.qpid.server.logging.messages.ChannelMessages; import org.apache.qpid.server.message.ServerMessage; -import org.apache.qpid.server.protocol.AMQConnectionModel; -import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.queue.QueueEntry; -import org.apache.qpid.server.security.AuthorizationHolder; +import org.apache.qpid.server.security.PrincipalHolder; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.subscription.Subscription_0_10; import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.txn.LocalTransaction; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.protocol.AMQSessionModel; +import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.transport.Binary; import org.apache.qpid.transport.Connection; import org.apache.qpid.transport.MessageTransfer; @@ -74,13 +58,24 @@ import org.apache.qpid.transport.Range; import org.apache.qpid.transport.RangeSet; import org.apache.qpid.transport.Session; import org.apache.qpid.transport.SessionDelegate; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -public class ServerSession extends Session implements AuthorizationHolder, SessionConfig, AMQSessionModel, LogSubject +import java.lang.ref.WeakReference; +import java.security.Principal; +import java.text.MessageFormat; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.SortedMap; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicLong; + +public class ServerSession extends Session implements PrincipalHolder, SessionConfig, AMQSessionModel, LogSubject { - private static final Logger _logger = LoggerFactory.getLogger(ServerSession.class); - private static final String NULL_DESTINTATION = UUID.randomUUID().toString(); private final UUID _id; @@ -116,7 +111,8 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi private final AtomicLong _txnCommits = new AtomicLong(0); private final AtomicLong _txnRejects = new AtomicLong(0); private final AtomicLong _txnCount = new AtomicLong(0); - private final AtomicLong _txnUpdateTime = new AtomicLong(0); + + private Principal _principal; private Map<String, Subscription_0_10> _subscriptions = new ConcurrentHashMap<String, Subscription_0_10>(); @@ -144,8 +140,8 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi super(connection, delegate, name, expiry); _connectionConfig = connConfig; _transaction = new AutoCommitTransaction(this.getMessageStore()); - - _reference = new WeakReference<Session>(this); + _principal = new UserPrincipal(connection.getAuthorizationID()); + _reference = new WeakReference(this); _id = getConfigStore().createId(); getConfigStore().addConfiguredObject(this); } @@ -164,8 +160,8 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi public void enqueue(final ServerMessage message, final ArrayList<? extends BaseQueue> queues) { - getConnectionModel().registerMessageReceived(message.getSize(), message.getArrivalTime()); - _transaction.enqueue(queues,message, new ServerTransaction.Action() + + _transaction.enqueue(queues,message, new ServerTransaction.Action() { BaseQueue[] _queues = queues.toArray(new BaseQueue[queues.size()]); @@ -193,7 +189,6 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi }); incrementOutstandingTxnsIfNecessary(); - updateTransactionalActivity(); } @@ -201,7 +196,6 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi Runnable postIdSettingAction) { invoke(xfr, postIdSettingAction); - getConnectionModel().registerMessageDelivered(xfr.getBodySize()); } public void onMessageDispositionChange(MessageTransfer xfr, MessageDispositionChangeListener acceptListener) @@ -383,7 +377,6 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi entry.release(); } }); - updateTransactionalActivity(); } public Collection<Subscription_0_10> getSubscriptions() @@ -417,7 +410,7 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi catch (AMQException e) { // TODO - _logger.error("Failed to unregister subscription", e); + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. } finally { @@ -432,11 +425,6 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi // theory return !(_transaction instanceof AutoCommitTransaction); } - - public boolean inTransaction() - { - return isTransactional() && _txnUpdateTime.get() > 0 && _transaction.getTransactionStartTime() > 0; - } public void selectTx() { @@ -483,17 +471,6 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi } } - /** - * Update last transaction activity timestamp - */ - public void updateTransactionalActivity() - { - if (isTransactional()) - { - _txnUpdateTime.set(System.currentTimeMillis()); - } - } - public Long getTxnStarts() { return _txnStarts.get(); @@ -514,14 +491,9 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi return _txnCount.get(); } - public Principal getAuthorizedPrincipal() - { - return ((ServerConnection) getConnection()).getAuthorizedPrincipal(); - } - - public Subject getAuthorizedSubject() + public Principal getPrincipal() { - return ((ServerConnection) getConnection()).getAuthorizedSubject(); + return _principal; } public void addSessionCloseTask(Task task) @@ -634,38 +606,7 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi return (LogSubject) this; } - public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose) throws AMQException - { - if (inTransaction()) - { - long currentTime = System.currentTimeMillis(); - long openTime = currentTime - _transaction.getTransactionStartTime(); - long idleTime = currentTime - _txnUpdateTime.get(); - - // Log a warning on idle or open transactions - if (idleWarn > 0L && idleTime > idleWarn) - { - CurrentActor.get().message(getLogSubject(), ChannelMessages.IDLE_TXN(openTime)); - _logger.warn("IDLE TRANSACTION ALERT " + getLogSubject().toString() + " " + idleTime + " ms"); - } - else if (openWarn > 0L && openTime > openWarn) - { - CurrentActor.get().message(getLogSubject(), ChannelMessages.OPEN_TXN(openTime)); - _logger.warn("OPEN TRANSACTION ALERT " + getLogSubject().toString() + " " + openTime + " ms"); - } - - // Close connection for idle or open transactions that have timed out - if (idleClose > 0L && idleTime > idleClose) - { - getConnectionModel().closeSession(this, AMQConstant.RESOURCE_ERROR, "Idle transaction timed out"); - } - else if (openClose > 0L && openTime > openClose) - { - getConnectionModel().closeSession(this, AMQConstant.RESOURCE_ERROR, "Open transaction timed out"); - } - } - } - + @Override public String toLogString() { return "[" + @@ -676,5 +617,7 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi getVirtualHost().getName(), getChannel()) + "] "; + } + } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java index f316d60c6a..42a3975e24 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java @@ -25,23 +25,21 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Map; -import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.AMQUnknownExchangeType; +import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.exchange.ExchangeFactory; -import org.apache.qpid.server.exchange.ExchangeInUseException; -import org.apache.qpid.server.exchange.ExchangeRegistry; -import org.apache.qpid.server.exchange.ExchangeType; -import org.apache.qpid.server.exchange.HeadersExchange; +import org.apache.qpid.server.exchange.*; import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.filter.FilterManagerFactory; import org.apache.qpid.server.flow.FlowCreditManager_0_10; import org.apache.qpid.server.flow.WindowCreditManager; +import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.actors.GenericActor; import org.apache.qpid.server.message.MessageMetaData_0_10; import org.apache.qpid.server.message.MessageTransferMessage; +import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.queue.BaseQueue; @@ -97,7 +95,6 @@ import org.apache.qpid.transport.TxSelect; public class ServerSessionDelegate extends SessionDelegate { - private static final Logger LOGGER = Logger.getLogger(ServerSessionDelegate.class); private final IApplicationRegistry _appRegistry; public ServerSessionDelegate(IApplicationRegistry appRegistry) @@ -108,24 +105,16 @@ public class ServerSessionDelegate extends SessionDelegate @Override public void command(Session session, Method method) { - try - { - setThreadSubject(session); + SecurityManager.setThreadPrincipal(session.getConnection().getAuthorizationID()); - if(!session.isClosing()) + if(!session.isClosing()) + { + super.command(session, method); + if (method.isSync()) { - super.command(session, method); - if (method.isSync()) - { - session.flushProcessed(); - } + session.flushProcessed(); } } - catch(RuntimeException e) - { - LOGGER.error("Exception processing command", e); - exception(session, method, ExecutionErrorCode.INTERNAL_ERROR, "Exception processing command: " + e); - } } @Override @@ -134,6 +123,8 @@ public class ServerSessionDelegate extends SessionDelegate ((ServerSession)session).accept(method.getTransfers()); } + + @Override public void messageReject(Session session, MessageReject method) { @@ -212,33 +203,32 @@ public class ServerSessionDelegate extends SessionDelegate { exception(session,method,ExecutionErrorCode.NOT_FOUND, "Queue: " + queueName + " not found"); } - else if(queue.getAuthorizationHolder() != null && queue.getAuthorizationHolder() != session) + else if(queue.getPrincipalHolder() != null && queue.getPrincipalHolder() != session) { exception(session,method,ExecutionErrorCode.RESOURCE_LOCKED, "Exclusive Queue: " + queueName + " owned exclusively by another session"); } else { + if(queue.isExclusive()) { - ServerSession s = (ServerSession) session; - queue.setExclusiveOwningSession(s); - if(queue.getAuthorizationHolder() == null) + if(queue.getPrincipalHolder() == null) { - queue.setAuthorizationHolder(s); - queue.setExclusiveOwningSession(s); + queue.setPrincipalHolder((ServerSession)session); ((ServerSession) session).addSessionCloseTask(new ServerSession.Task() { + public void doTask(ServerSession session) { - if(queue.getAuthorizationHolder() == session) + if(queue.getPrincipalHolder() == session) { - queue.setAuthorizationHolder(null); - queue.setExclusiveOwningSession(null); + queue.setPrincipalHolder(null); } } }); } + } FlowCreditManager_0_10 creditManager = new WindowCreditManager(0L,0L); @@ -379,6 +369,7 @@ public class ServerSessionDelegate extends SessionDelegate } ssn.processed(xfr); + } @Override @@ -398,7 +389,7 @@ public class ServerSessionDelegate extends SessionDelegate ((ServerSession)session).unregister(sub); if(!queue.isDeleted() && queue.isExclusive() && queue.getConsumerCount() == 0) { - queue.setAuthorizationHolder(null); + queue.setPrincipalHolder(null); } } } @@ -457,19 +448,6 @@ public class ServerSessionDelegate extends SessionDelegate VirtualHost virtualHost = getVirtualHost(session); Exchange exchange = getExchange(session, exchangeName); - //we must check for any unsupported arguments present and throw not-implemented - if(method.hasArguments()) - { - Map<String,Object> args = method.getArguments(); - - //QPID-3392: currently we don't support any! - if(!args.isEmpty()) - { - exception(session, method, ExecutionErrorCode.NOT_IMPLEMENTED, "Unsupported exchange argument(s) found " + args.keySet().toString()); - return; - } - } - if(method.getPassive()) { if(exchange == null) @@ -479,6 +457,7 @@ public class ServerSessionDelegate extends SessionDelegate } else { + // TODO - check exchange has same properties if(!exchange.getTypeShortString().toString().equals(method.getType())) { exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Cannot redeclare with a different exchange type"); @@ -990,10 +969,10 @@ public class ServerSessionDelegate extends SessionDelegate } - if (method.hasAutoDelete() - && method.getAutoDelete() - && method.hasExclusive() - && method.getExclusive()) + if(method.hasAutoDelete() + && method.getAutoDelete() + && method.hasExclusive() + && method.getExclusive()) { final AMQQueue q = queue; final ServerSession.Task deleteQueueTask = new ServerSession.Task() @@ -1020,23 +999,23 @@ public class ServerSessionDelegate extends SessionDelegate } }); } - if (method.hasExclusive() - && method.getExclusive()) + else if(method.getExclusive()) { final AMQQueue q = queue; final ServerSession.Task removeExclusive = new ServerSession.Task() { + public void doTask(ServerSession session) { - q.setAuthorizationHolder(null); + q.setPrincipalHolder(null); q.setExclusiveOwningSession(null); } }; final ServerSession s = (ServerSession) session; - q.setExclusiveOwningSession(s); s.addSessionCloseTask(removeExclusive); queue.addQueueDeleteTask(new AMQQueue.Task() { + public void doTask(AMQQueue queue) throws AMQException { s.removeSessionCloseTask(removeExclusive); @@ -1050,7 +1029,7 @@ public class ServerSessionDelegate extends SessionDelegate } } } - else if (method.getExclusive() && (queue.getExclusiveOwningSession() != null && !queue.getExclusiveOwningSession().equals(session))) + else if (method.getExclusive() && (queue.getPrincipalHolder() != null && !queue.getPrincipalHolder().equals(session))) { String description = "Cannot declare queue('" + queueName + "')," + " as exclusive queue with same name " @@ -1098,7 +1077,7 @@ public class ServerSessionDelegate extends SessionDelegate } else { - if(queue.getAuthorizationHolder() != null && queue.getAuthorizationHolder() != session) + if(queue.getPrincipalHolder() != null && queue.getPrincipalHolder() != session) { exception(session,method,ExecutionErrorCode.RESOURCE_LOCKED, "Exclusive Queue: " + queueName + " owned exclusively by another session"); } @@ -1244,8 +1223,6 @@ public class ServerSessionDelegate extends SessionDelegate @Override public void closed(Session session) { - setThreadSubject(session); - for(Subscription_0_10 sub : getSubscriptions(session)) { ((ServerSession)session).unregister(sub); @@ -1264,9 +1241,4 @@ public class ServerSessionDelegate extends SessionDelegate return ((ServerSession)session).getSubscriptions(); } - private void setThreadSubject(Session session) - { - final ServerConnection scon = (ServerConnection) session.getConnection(); - SecurityManager.setThreadSubject(scon.getAuthorizedSubject()); - } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java index 36e9d78440..db781ead96 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java @@ -50,11 +50,6 @@ public class AutoCommitTransaction implements ServerTransaction _transactionLog = transactionLog; } - public long getTransactionStartTime() - { - return 0L; - } - /** * Since AutoCommitTransaction have no concept of a long lived transaction, any Actions registered * by the caller are executed immediately. diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java index 946dbd7c28..a04c743be1 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java @@ -20,23 +20,18 @@ package org.apache.qpid.server.txn; * */ + import java.util.ArrayList; import java.util.Collection; import java.util.List; +import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.server.message.EnqueableMessage; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.store.TransactionLog; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.BaseQueue; -import org.apache.qpid.server.queue.QueueEntry; -import org.apache.qpid.server.store.TransactionLog; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * A concrete implementation of ServerTransaction where enqueue/dequeue @@ -46,28 +41,17 @@ import org.slf4j.LoggerFactory; */ public class LocalTransaction implements ServerTransaction { - protected static final Logger _logger = LoggerFactory.getLogger(LocalTransaction.class); + protected static final Logger _logger = Logger.getLogger(LocalTransaction.class); private final List<Action> _postTransactionActions = new ArrayList<Action>(); private volatile TransactionLog.Transaction _transaction; private TransactionLog _transactionLog; - private long _txnStartTime = 0L; public LocalTransaction(TransactionLog transactionLog) { _transactionLog = transactionLog; } - - public boolean inTransaction() - { - return _transaction != null; - } - - public long getTransactionStartTime() - { - return _txnStartTime; - } public void addPostTransactionAction(Action postTransactionAction) { @@ -105,6 +89,7 @@ public class LocalTransaction implements ServerTransaction try { + for(QueueEntry entry : queueEntries) { ServerMessage message = entry.getMessage(); @@ -128,6 +113,7 @@ public class LocalTransaction implements ServerTransaction _logger.error("Error during message dequeues", e); tidyUpOnError(e); } + } private void tidyUpOnError(Exception e) @@ -154,7 +140,8 @@ public class LocalTransaction implements ServerTransaction } finally { - resetDetails(); + _transaction = null; + _postTransactionActions.clear(); } } @@ -206,11 +193,6 @@ public class LocalTransaction implements ServerTransaction { _postTransactionActions.add(postTransactionAction); - if (_txnStartTime == 0L) - { - _txnStartTime = System.currentTimeMillis(); - } - if(message.isPersistent()) { try @@ -266,14 +248,17 @@ public class LocalTransaction implements ServerTransaction } finally { - resetDetails(); + _transaction = null; + _postTransactionActions.clear(); } + } public void rollback() { try { + if(_transaction != null) { _transaction.abortTran(); @@ -295,15 +280,9 @@ public class LocalTransaction implements ServerTransaction } finally { - resetDetails(); + _transaction = null; + _postTransactionActions.clear(); } } } - - private void resetDetails() - { - _transaction = null; - _postTransactionActions.clear(); - _txnStartTime = 0L; - } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java index b3c6e1ac3a..b61b8a5c64 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java @@ -52,13 +52,6 @@ public interface ServerTransaction public void onRollback(); } - /** - * Return the time the current transaction started. - * - * @return the time this transaction started or 0 if not in a transaction - */ - long getTransactionStartTime(); - /** * Register an Action for execution after transaction commit or rollback. Actions * will be executed in the order in which they are registered. diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/HouseKeepingTask.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/HouseKeepingTask.java index ebace95f65..2db1944cd1 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/HouseKeepingTask.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/HouseKeepingTask.java @@ -63,10 +63,6 @@ public abstract class HouseKeepingTask implements Runnable { _logger.warn(this.getClass().getSimpleName() + " throw exception: " + e, e); } - finally - { - CurrentActor.remove(); - } } public VirtualHost getVirtualHost() diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java index 04f19b79bb..4ed0507228 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java @@ -20,28 +20,30 @@ */ package org.apache.qpid.server.virtualhost; -import java.util.UUID; - import org.apache.qpid.common.Closeable; -import org.apache.qpid.server.binding.BindingFactory; -import org.apache.qpid.server.configuration.ConfigStore; -import org.apache.qpid.server.configuration.VirtualHostConfig; -import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.connection.IConnectionRegistry; -import org.apache.qpid.server.exchange.ExchangeFactory; -import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.federation.BrokerLink; -import org.apache.qpid.server.management.ManagedObject; +import org.apache.qpid.server.configuration.VirtualHostConfiguration; +import org.apache.qpid.server.configuration.VirtualHostConfig; +import org.apache.qpid.server.configuration.ConfigStore; import org.apache.qpid.server.queue.QueueRegistry; -import org.apache.qpid.server.registry.IApplicationRegistry; -import org.apache.qpid.server.security.SecurityManager; -import org.apache.qpid.server.security.auth.manager.AuthenticationManager; -import org.apache.qpid.server.stats.StatisticsGatherer; -import org.apache.qpid.server.store.DurableConfigurationStore; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.exchange.ExchangeFactory; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.TransactionLog; +import org.apache.qpid.server.store.DurableConfigurationStore; +import org.apache.qpid.server.security.SecurityManager; +import org.apache.qpid.server.security.auth.manager.AuthenticationManager; +import org.apache.qpid.server.management.ManagedObject; +import org.apache.qpid.server.registry.IApplicationRegistry; +import org.apache.qpid.server.binding.BindingFactory; + +import java.util.List; +import java.util.UUID; +import java.util.TimerTask; +import java.util.concurrent.FutureTask; -public interface VirtualHost extends DurableConfigurationStore.Source, VirtualHostConfig, Closeable, StatisticsGatherer +public interface VirtualHost extends DurableConfigurationStore.Source, VirtualHostConfig, Closeable { IConnectionRegistry getConnectionRegistry(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java index 52acd9085b..6ec1c512e5 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java @@ -24,18 +24,19 @@ import java.util.Collections; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.TimerTask; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import javax.management.NotCompliantMBeanException; -import javax.management.ObjectName; import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.ConfigurationException; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; +import org.apache.qpid.AMQInternalException; import org.apache.qpid.AMQStoreException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; @@ -62,8 +63,6 @@ import org.apache.qpid.server.logging.messages.VirtualHostMessages; import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject; import org.apache.qpid.server.management.AMQManagedObject; import org.apache.qpid.server.management.ManagedObject; -import org.apache.qpid.server.protocol.AMQConnectionModel; -import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.queue.DefaultQueueRegistry; @@ -72,7 +71,7 @@ import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.registry.IApplicationRegistry; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.security.auth.manager.AuthenticationManager; -import org.apache.qpid.server.stats.StatisticsCounter; +import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager; import org.apache.qpid.server.store.ConfigurationRecoveryHandler; import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.MessageStore; @@ -100,7 +99,7 @@ public class VirtualHostImpl implements VirtualHost private AMQBrokerManagerMBean _brokerMBean; - private final AuthenticationManager _authenticationManager; + private AuthenticationManager _authenticationManager; private SecurityManager _securityManager; @@ -112,8 +111,6 @@ public class VirtualHostImpl implements VirtualHost private BrokerConfig _broker; private UUID _id; - private boolean _statisticsEnabled = false; - private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived; private final long _createTime = System.currentTimeMillis(); private final ConcurrentHashMap<BrokerLink,BrokerLink> _links = new ConcurrentHashMap<BrokerLink, BrokerLink>(); @@ -164,12 +161,12 @@ public class VirtualHostImpl implements VirtualHost public String getObjectInstanceName() { - return ObjectName.quote(_name); + return _name.toString(); } public String getName() { - return _name; + return _name.toString(); } public VirtualHostImpl getVirtualHost() @@ -247,13 +244,11 @@ public class VirtualHostImpl implements VirtualHost initialiseMessageStore(hostConfig); } - _authenticationManager = ApplicationRegistry.getInstance().getAuthenticationManager(); + _authenticationManager = new PrincipalDatabaseAuthenticationManager(_name, _configuration); _brokerMBean = new AMQBrokerManagerMBean(_virtualHostMBean); _brokerMBean.register(); initialiseHouseKeeping(hostConfig.getHousekeepingExpiredMessageCheckPeriod()); - - initialiseStatistics(); } private void initialiseHouseKeeping(long period) @@ -286,30 +281,19 @@ public class VirtualHostImpl implements VirtualHost // house keeping task from running. } } - for (AMQConnectionModel connection : getConnectionRegistry().getConnections()) - { - _logger.debug("Checking for long running open transactions on connection " + connection); - for (AMQSessionModel session : connection.getSessionModels()) - { - _logger.debug("Checking for long running open transactions on session " + session); - try - { - session.checkTransactionStatus(_configuration.getTransactionTimeoutOpenWarn(), - _configuration.getTransactionTimeoutOpenClose(), - _configuration.getTransactionTimeoutIdleWarn(), - _configuration.getTransactionTimeoutIdleClose()); - } - catch (Exception e) - { - _logger.error("Exception in housekeeping for connection: " + connection.toString(), e); - } - } - } } } scheduleHouseKeepingTask(period, new ExpiredMessagesTask(this)); + class ForceChannelClosuresTask extends TimerTask + { + public void run() + { + _connectionRegistry.expireClosedChannels(); + } + } + Map<String, VirtualHostPluginFactory> plugins = ApplicationRegistry.getInstance().getPluginManager().getVirtualHostPlugins(); @@ -643,80 +627,6 @@ public class VirtualHostImpl implements VirtualHost { return _bindingFactory; } - - public void registerMessageDelivered(long messageSize) - { - if (isStatisticsEnabled()) - { - _messagesDelivered.registerEvent(1L); - _dataDelivered.registerEvent(messageSize); - } - _appRegistry.registerMessageDelivered(messageSize); - } - - public void registerMessageReceived(long messageSize, long timestamp) - { - if (isStatisticsEnabled()) - { - _messagesReceived.registerEvent(1L, timestamp); - _dataReceived.registerEvent(messageSize, timestamp); - } - _appRegistry.registerMessageReceived(messageSize, timestamp); - } - - public StatisticsCounter getMessageReceiptStatistics() - { - return _messagesReceived; - } - - public StatisticsCounter getDataReceiptStatistics() - { - return _dataReceived; - } - - public StatisticsCounter getMessageDeliveryStatistics() - { - return _messagesDelivered; - } - - public StatisticsCounter getDataDeliveryStatistics() - { - return _dataDelivered; - } - - public void resetStatistics() - { - _messagesDelivered.reset(); - _dataDelivered.reset(); - _messagesReceived.reset(); - _dataReceived.reset(); - - for (AMQConnectionModel connection : _connectionRegistry.getConnections()) - { - connection.resetStatistics(); - } - } - - public void initialiseStatistics() - { - setStatisticsEnabled(!StatisticsCounter.DISABLE_STATISTICS && - _appRegistry.getConfiguration().isStatisticsGenerationVirtualhostsEnabled()); - - _messagesDelivered = new StatisticsCounter("messages-delivered-" + getName()); - _dataDelivered = new StatisticsCounter("bytes-delivered-" + getName()); - _messagesReceived = new StatisticsCounter("messages-received-" + getName()); - _dataReceived = new StatisticsCounter("bytes-received-" + getName()); - } - - public boolean isStatisticsEnabled() - { - return _statisticsEnabled; - } - - public void setStatisticsEnabled(boolean enabled) - { - _statisticsEnabled = enabled; - } public void createBrokerConnection(final String transport, final String host, diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/MessageStoreTool.java b/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/MessageStoreTool.java index 2c0ceed80b..dca165fa7e 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/MessageStoreTool.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/MessageStoreTool.java @@ -192,7 +192,7 @@ public class MessageStoreTool if (_initialised) { - ApplicationRegistry.remove(); + ApplicationRegistry.remove(1); } _console.println("...exiting"); @@ -274,7 +274,7 @@ public class MessageStoreTool { ConfigurationFileApplicationRegistry registry = new ConfigurationFileApplicationRegistry(configFile); - ApplicationRegistry.remove(); + ApplicationRegistry.remove(1); ApplicationRegistry.initialise(registry); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java b/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java index 806e161bbc..4fd4999b19 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java @@ -364,7 +364,7 @@ public class Show extends AbstractCommand { if(msg instanceof AMQMessage) { - headers = ((BasicContentHeaderProperties) ((AMQMessage)msg).getContentHeaderBody().getProperties()); + headers = ((BasicContentHeaderProperties) ((AMQMessage)msg).getContentHeaderBody().properties); } } catch (AMQException e) |