diff options
Diffstat (limited to 'qpid/java/broker')
27 files changed, 355 insertions, 188 deletions
diff --git a/qpid/java/broker/bin/qpid-server.bat b/qpid/java/broker/bin/qpid-server.bat index af543decb3..6b7bbcb96e 100644 --- a/qpid/java/broker/bin/qpid-server.bat +++ b/qpid/java/broker/bin/qpid-server.bat @@ -65,7 +65,7 @@ if "%AMQJ_LOGGING_LEVEL%" == "" set AMQJ_LOGGING_LEVEL=info REM Set the default system properties that we'll use now that they have
REM all been initialised
-set SYSTEM_PROPS=-Damqj.logging.level=%AMQJ_LOGGING_LEVEL% "-DQPID_HOME=%QPID_HOME%" "-DQPID_WORK=%QPID_WORK%"
+set SYSTEM_PROPS=-Damqj.logging.level=%AMQJ_LOGGING_LEVEL% -DQPID_HOME="%QPID_HOME%" -DQPID_WORK="%QPID_WORK%"
if "%EXTERNAL_CLASSPATH%" == "" set EXTERNAL_CLASSPATH=%CLASSPATH%
@@ -77,7 +77,7 @@ goto afterQpidClasspath :noQpidClasspath
echo Warning: Qpid classpath not set. CLASSPATH set to %QPID_HOME%\lib\qpid-all.jar
-set CLASSPATH=%QPID_HOME%\lib\qpid-all.jar
+set CLASSPATH=%QPID_HOME%\lib\qpid-all.jar;%QPID_HOME%\lib\opt\*
:afterQpidClasspath
REM start parsing -run arguments
diff --git a/qpid/java/broker/build.xml b/qpid/java/broker/build.xml index b15a7fd02e..6ea2b9a63e 100644 --- a/qpid/java/broker/build.xml +++ b/qpid/java/broker/build.xml @@ -19,10 +19,10 @@ - --> <project name="AMQ Broker" default="build"> - <property name="module.depends" value="management/common common amqp-1-0-common"/> <property name="module.test.depends" value="common/test" /> <property name="module.main" value="org.apache.qpid.server.Main"/> + <property name="module.genpom" value="true"/> <import file="../module.xml"/> @@ -84,4 +84,5 @@ <target name="release-bin" depends="release-bin-tasks"/> + <target name="bundle" depends="bundle-tasks"/> </project> diff --git a/qpid/java/broker/etc/config.xml b/qpid/java/broker/etc/config.xml index 2752274155..d18e1392e6 100644 --- a/qpid/java/broker/etc/config.xml +++ b/qpid/java/broker/etc/config.xml @@ -35,8 +35,8 @@ <enabled>false</enabled> <port>5671</port> <sslOnly>false</sslOnly> - <keystorePath>/path/to/keystore.ks</keystorePath> - <keystorePassword>keystorepass</keystorePassword> + <keyStorePath>/path/to/keystore.ks</keyStorePath> + <keyStorePassword>keystorepass</keyStorePassword> </ssl> <port>5672</port> <socketReceiveBuffer>262144</socketReceiveBuffer> diff --git a/qpid/java/broker/etc/virtualhosts.xml b/qpid/java/broker/etc/virtualhosts.xml index 33a48a1349..4dcdcda6d2 100644 --- a/qpid/java/broker/etc/virtualhosts.xml +++ b/qpid/java/broker/etc/virtualhosts.xml @@ -25,8 +25,9 @@ <name>localhost</name> <localhost> <store> - <class>org.apache.qpid.server.store.MemoryMessageStore - </class> + <class>org.apache.qpid.server.store.MemoryMessageStore</class> + <!--<class>org.apache.qpid.server.store.DerbyMessageStore</class> + <environment-path>${QPID_WORK}/derbystore</environment-path>--> </store> <housekeeping> @@ -85,8 +86,9 @@ <name>development</name> <development> <store> - <class>org.apache.qpid.server.store.MemoryMessageStore - </class> + <class>org.apache.qpid.server.store.MemoryMessageStore</class> + <!--<class>org.apache.qpid.server.store.DerbyMessageStore</class> + <environment-path>${QPID_WORK}/derbystore</environment-path>--> </store> <queues> @@ -123,8 +125,9 @@ <name>test</name> <test> <store> - <class>org.apache.qpid.server.store.MemoryMessageStore - </class> + <class>org.apache.qpid.server.store.MemoryMessageStore</class> + <!--<class>org.apache.qpid.server.store.DerbyMessageStore</class> + <environment-path>${QPID_WORK}/derbystore</environment-path>--> </store> <queues> diff --git a/qpid/java/broker/src/main/java/broker.bnd b/qpid/java/broker/src/main/java/broker.bnd new file mode 100755 index 0000000000..25b0495a63 --- /dev/null +++ b/qpid/java/broker/src/main/java/broker.bnd @@ -0,0 +1,26 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +ver: 0.15.0 + +Bundle-SymbolicName: qpid-broker +Bundle-Version: ${ver} +Export-Package: *;version=${ver} +Bundle-RequiredExecutionEnvironment: J2SE-1.5 + diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java index 9c77032d4c..6abef6fd6b 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java @@ -694,7 +694,8 @@ public class QMFService implements ConfigStore.ConfigEventListener, Closeable public BrokerSchema.BrokerClass.QueueMoveMessagesMethodResponseCommand queueMoveMessages(final BrokerSchema.BrokerClass.QueueMoveMessagesMethodResponseCommandFactory factory, final String srcQueue, final String destQueue, - final Long qty) + final Long qty, + final Map filter) // TODO: move based on group identifier { // TODO return factory.createResponseCommand(CompletionCode.NOT_IMPLEMENTED); @@ -712,6 +713,19 @@ public class QMFService implements ConfigStore.ConfigEventListener, Closeable return factory.createResponseCommand(CompletionCode.NOT_IMPLEMENTED); } + public BrokerSchema.BrokerClass.GetTimestampConfigMethodResponseCommand getTimestampConfig(final BrokerSchema.BrokerClass.GetTimestampConfigMethodResponseCommandFactory factory) + { + // TODO: timestamp support + return factory.createResponseCommand(CompletionCode.NOT_IMPLEMENTED); + } + + public BrokerSchema.BrokerClass.SetTimestampConfigMethodResponseCommand setTimestampConfig(final BrokerSchema.BrokerClass.SetTimestampConfigMethodResponseCommandFactory factory, + final java.lang.Boolean receive) + { + // TODO: timestamp support + return factory.createResponseCommand(CompletionCode.NOT_IMPLEMENTED); + } + public BrokerSchema.BrokerClass.CreateMethodResponseCommand create(final BrokerSchema.BrokerClass.CreateMethodResponseCommandFactory factory, final String type, final String name, @@ -731,6 +745,14 @@ public class QMFService implements ConfigStore.ConfigEventListener, Closeable return factory.createResponseCommand(CompletionCode.NOT_IMPLEMENTED); } + public BrokerSchema.BrokerClass.QueryMethodResponseCommand query(final BrokerSchema.BrokerClass.QueryMethodResponseCommandFactory factory, + final String type, + final String name) + { + //TODO: + return factory.createResponseCommand(CompletionCode.NOT_IMPLEMENTED); + } + public UUID getId() { return _obj.getId(); @@ -1102,7 +1124,8 @@ public class QMFService implements ConfigStore.ConfigEventListener, Closeable } public BrokerSchema.QueueClass.PurgeMethodResponseCommand purge(final BrokerSchema.QueueClass.PurgeMethodResponseCommandFactory factory, - final Long request) + final Long request, + final Map filter) // TODO: support for purge-by-group-identifier { try { @@ -1118,7 +1141,8 @@ public class QMFService implements ConfigStore.ConfigEventListener, Closeable public BrokerSchema.QueueClass.RerouteMethodResponseCommand reroute(final BrokerSchema.QueueClass.RerouteMethodResponseCommandFactory factory, final Long request, final Boolean useAltExchange, - final String exchange) + final String exchange, + final Map filter) // TODO: support for re-route-by-group-identifier { //TODO return factory.createResponseCommand(CompletionCode.NOT_IMPLEMENTED); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/Broker.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/Broker.java index c337c4db75..5c1814590c 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/Broker.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/Broker.java @@ -218,9 +218,9 @@ public class Broker if (serverConfig.getEnableSSL()) { - final String keystorePath = serverConfig.getKeystorePath(); - final String keystorePassword = serverConfig.getKeystorePassword(); - final String certType = serverConfig.getCertType(); + final String keystorePath = serverConfig.getConnectorKeyStorePath(); + final String keystorePassword = serverConfig.getConnectorKeyStorePassword(); + final String certType = serverConfig.getConnectorCertType(); final SSLContext sslContext = SSLContextFactory.buildServerContext(keystorePath, keystorePassword, certType); for(int sslPort : sslPorts) diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/BrokerConfig.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/BrokerConfig.java index 5cdb886821..7dffc2d3c0 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/BrokerConfig.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/BrokerConfig.java @@ -21,6 +21,8 @@ package org.apache.qpid.server.configuration; +import java.util.List; + public interface BrokerConfig extends ConfiguredObject<BrokerConfigType,BrokerConfig> { @@ -44,6 +46,19 @@ public interface BrokerConfig extends ConfiguredObject<BrokerConfigType,BrokerC String getDataDirectory(); + String getFederationTag(); + + /** + * List of feature(s) to be advertised to clients on connection. + * Feature names are strings, beginning with qpid. followed by more or more + * words separated by minus signs e.g. qpid.jms-selector. + * + * If there are no features, this method must return an empty array. + * + * @return list of feature names + */ + List<String> getFeatures(); + void addVirtualHost(VirtualHostConfig virtualHost); void createBrokerConnection(String transport, @@ -53,5 +68,4 @@ public interface BrokerConfig extends ConfiguredObject<BrokerConfigType,BrokerC String authMechanism, String username, String password); - String getFederationTag(); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/BrokerConfigType.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/BrokerConfigType.java index 82b2fc82d2..e1cf87277b 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/BrokerConfigType.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/BrokerConfigType.java @@ -22,7 +22,6 @@ package org.apache.qpid.server.configuration; import java.util.*; -import java.io.File; public final class BrokerConfigType extends ConfigObjectType<BrokerConfigType, BrokerConfig> { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java index 9ca916a633..0d347873c2 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java @@ -123,7 +123,7 @@ public class ServerConfiguration extends ConfigurationPlugin * Configuration Manager to be initialised in the Application Registry. * <p> * If using this ServerConfiguration via an ApplicationRegistry there is no - * need to explictly call {@link #initialise()} as this is done via the + * need to explicitly call {@link #initialise()} as this is done via the * {@link ApplicationRegistry#initialise()} method. * * @param configurationURL @@ -169,7 +169,7 @@ public class ServerConfiguration extends ConfigurationPlugin * Configuration Manager to be initialised in the Application Registry. * <p> * If using this ServerConfiguration via an ApplicationRegistry there is no - * need to explictly call {@link #initialise()} as this is done via the + * need to explicitly call {@link #initialise()} as this is done via the * {@link ApplicationRegistry#initialise()} method. * * @param conf @@ -239,6 +239,22 @@ public class ServerConfiguration extends ConfigurationPlugin + (_configFile == null ? "" : " Configuration file : " + _configFile); throw new ConfigurationException(message); } + + // QPID-3517: Inconsistency in capitalisation in the SSL configuration keys used within the connector and management configuration + // sections. For the moment, continue to understand both but generate a deprecated warning if the less preferred keystore is used. + for (String key : new String[] {"management.ssl.keystorePath", + "management.ssl.keystorePassword," + + "connector.ssl.keystorePath", + "connector.ssl.keystorePassword"}) + { + if (contains(key)) + { + final String deprecatedXpath = key.replaceAll("\\.", "/"); + final String preferredXpath = deprecatedXpath.replaceAll("keystore", "keyStore"); + _logger.warn("Validation warning: " + deprecatedXpath + " is deprecated and must be replaced by " + preferredXpath + + (_configFile == null ? "" : " Configuration file : " + _configFile)); + } + } } /* @@ -404,7 +420,7 @@ public class ServerConfiguration extends ConfigurationPlugin public final static Configuration flatConfig(File file) throws ConfigurationException { // We have to override the interpolate methods so that - // interpolation takes place accross the entirety of the + // interpolation takes place across the entirety of the // composite configuration. Without doing this each // configuration object only interpolates variables defined // inside itself. @@ -551,7 +567,8 @@ public class ServerConfiguration extends ConfigurationPlugin public String getManagementKeyStorePath() { - return getStringValue("management.ssl.keyStorePath"); + final String fallback = getStringValue("management.ssl.keystorePath"); + return getStringValue("management.ssl.keyStorePath", fallback); } public boolean getManagementSSLEnabled() @@ -561,7 +578,8 @@ public class ServerConfiguration extends ConfigurationPlugin public String getManagementKeyStorePassword() { - return getStringValue("management.ssl.keyStorePassword"); + final String fallback = getStringValue("management.ssl.keystorePassword"); + return getStringValue("management.ssl.keyStorePassword", fallback); } public boolean getQueueAutoRegister() @@ -699,17 +717,19 @@ public class ServerConfiguration extends ConfigurationPlugin return getListValue("connector.ssl.port", Collections.<Integer>singletonList(DEFAULT_SSL_PORT)); } - public String getKeystorePath() + public String getConnectorKeyStorePath() { - return getStringValue("connector.ssl.keystorePath"); + final String fallback = getStringValue("connector.ssl.keystorePath"); // pre-0.13 broker supported this name. + return getStringValue("connector.ssl.keyStorePath", fallback); } - public String getKeystorePassword() + public String getConnectorKeyStorePassword() { - return getStringValue("connector.ssl.keystorePassword"); + final String fallback = getStringValue("connector.ssl.keystorePassword"); // pre-0.13 brokers supported this name. + return getStringValue("connector.ssl.keyStorePassword", fallback); } - public String getCertType() + public String getConnectorCertType() { return getStringValue("connector.ssl.certType", "SunX509"); } @@ -773,4 +793,16 @@ public class ServerConfiguration extends ConfigurationPlugin { return getIntValue("maximumChannelCount", 256); } + + /** + * List of Broker features that have been disabled within configuration. Disabled + * features won't be advertised to the clients on connection. + * + * @return list of disabled features, or empty list if no features are disabled. + */ + public List<String> getDisabledFeatures() + { + final List<String> disabledFeatures = getListValue("disabledFeatures", Collections.emptyList()); + return disabledFeatures; + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/BrokerLink.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/BrokerLink.java index f21158cd0c..f330e2f708 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/BrokerLink.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/BrokerLink.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.server.federation; +import org.apache.qpid.common.ServerPropertyNames; import org.apache.qpid.server.configuration.ConfigStore; import org.apache.qpid.server.configuration.ConfiguredObject; import org.apache.qpid.server.configuration.ConnectionConfig; @@ -252,7 +253,7 @@ public class BrokerLink implements LinkConfig, ConnectionListener _qpidConnection.connect(_host, _port, _remoteVhost, _username, _password, "ssl".equals(_transport), _authMechanism); final Map<String,Object> serverProps = _qpidConnection.getServerProperties(); - _remoteFederationTag = (String) serverProps.get("qpid.federation_tag"); + _remoteFederationTag = (String) serverProps.get(ServerPropertyNames.FEDERATION_TAG); if(_remoteFederationTag == null) { _remoteFederationTag = UUID.fromString(_transport+":"+_host+":"+_port).toString(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java index f16e30ef92..0e0b18aa2f 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java @@ -30,20 +30,17 @@ import org.apache.qpid.server.configuration.SessionConfig; import org.apache.qpid.server.queue.AMQQueue; -import java.util.concurrent.atomic.AtomicInteger; import java.lang.ref.WeakReference; import java.nio.ByteBuffer; /** * A deliverable message. */ -public class AMQMessage implements ServerMessage<AMQMessage> +public class AMQMessage extends AbstractServerMessageImpl { /** Used for debugging purposes. */ private static final Logger _log = Logger.getLogger(AMQMessage.class); - private final AtomicInteger _referenceCount = new AtomicInteger(0); - /** Flag to indicate that this message requires 'immediate' delivery. */ private static final byte IMMEDIATE = 0x01; @@ -76,6 +73,8 @@ public class AMQMessage implements ServerMessage<AMQMessage> public AMQMessage(StoredMessage<MessageMetaData> handle, WeakReference<AMQChannel> channelRef) { + super(handle); + _handle = handle; final MessageMetaData metaData = handle.getMetaData(); _size = metaData.getContentSize(); @@ -89,12 +88,6 @@ public class AMQMessage implements ServerMessage<AMQMessage> _channelRef = channelRef; } - - public String debugIdentity() - { - return "(HC:" + System.identityHashCode(this) + " ID:" + getMessageId() + " Ref:" + _referenceCount.get() + ")"; - } - public void setExpiration(final long expiration) { @@ -102,11 +95,6 @@ public class AMQMessage implements ServerMessage<AMQMessage> } - public boolean isReferenced() - { - return _referenceCount.get() > 0; - } - public MessageMetaData getMessageMetaData() { return _handle.getMetaData(); @@ -117,88 +105,12 @@ public class AMQMessage implements ServerMessage<AMQMessage> return getMessageMetaData().getContentHeaderBody(); } - - public Long getMessageId() { return _handle.getMessageNumber(); } /** - * Creates a long-lived reference to this message, and increments the count of such references, as an atomic - * operation. - */ - public AMQMessage takeReference() - { - incrementReference(); // _referenceCount.incrementAndGet(); - - return this; - } - - public boolean incrementReference() - { - return incrementReference(1); - } - - /* Threadsafe. Increment the reference count on the message. */ - public boolean incrementReference(int count) - { - - if(_referenceCount.addAndGet(count) <= 0) - { - _referenceCount.addAndGet(-count); - return false; - } - else - { - return true; - } - - } - - /** - * Threadsafe. This will decrement the reference count and when it reaches zero will remove the message from the - * message store. - * - * - * @throws org.apache.qpid.server.queue.MessageCleanupException when an attempt was made to remove the message from the message store and that - * failed - */ - public void decrementReference() - { - int count = _referenceCount.decrementAndGet(); - - // note that the operation of decrementing the reference count and then removing the message does not - // have to be atomic since the ref count starts at 1 and the exchange itself decrements that after - // the message has been passed to all queues. i.e. we are - // not relying on the all the increments having taken place before the delivery manager decrements. - if (count == 0) - { - // set the reference count way below 0 so that we can detect that the message has been deleted - // this is to guard against the message being spontaneously recreated (from the mgmt console) - // by copying from other queues at the same time as it is being removed. - _referenceCount.set(Integer.MIN_VALUE/2); - - // must check if the handle is null since there may be cases where we decide to throw away a message - // and the handle has not yet been constructed - if (_handle != null) - { - _handle.remove(); - - } - } - else - { - if (count < 0) - { - throw new RuntimeException("Reference count for message id " + debugIdentity() - + " has gone below 0."); - } - } - } - - - /** * Called selectors to determin if the message has already been sent * * @return _deliveredToConsumer @@ -323,10 +235,7 @@ public class AMQMessage implements ServerMessage<AMQMessage> public String toString() { - // return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken : " + - // _taken + " by :" + _takenBySubcription; - - return "Message[" + debugIdentity() + "]: " + getMessageId() + "; ref count: " + _referenceCount; + return "Message[" + debugIdentity() + "]: " + getMessageId() + "; ref count: " + getReferenceCount(); } public int getContent(ByteBuffer buf, int offset) diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java new file mode 100644 index 0000000000..186bb8601c --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java @@ -0,0 +1,84 @@ +package org.apache.qpid.server.message; + +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.qpid.server.store.StoredMessage; + +public abstract class AbstractServerMessageImpl implements ServerMessage +{ + private final AtomicInteger _referenceCount = new AtomicInteger(0); + private final StoredMessage<?> _handle; + + public AbstractServerMessageImpl(StoredMessage<?> handle) + { + _handle = handle; + } + + public boolean incrementReference() + { + return incrementReference(1); + } + + public boolean incrementReference(int count) + { + if(_referenceCount.addAndGet(count) <= 0) + { + _referenceCount.addAndGet(-count); + return false; + } + else + { + return true; + } + } + + /** + * Threadsafe. This will decrement the reference count and when it reaches zero will remove the message from the + * message store. + * + * + * @throws org.apache.qpid.server.queue.MessageCleanupException when an attempt was made to remove the message from the message store and that + * failed + */ + public void decrementReference() + { + int count = _referenceCount.decrementAndGet(); + + // note that the operation of decrementing the reference count and then removing the message does not + // have to be atomic since the ref count starts at 1 and the exchange itself decrements that after + // the message has been passed to all queues. i.e. we are + // not relying on the all the increments having taken place before the delivery manager decrements. + if (count == 0) + { + // set the reference count way below 0 so that we can detect that the message has been deleted + // this is to guard against the message being spontaneously recreated (from the mgmt console) + // by copying from other queues at the same time as it is being removed. + _referenceCount.set(Integer.MIN_VALUE/2); + + // must check if the handle is null since there may be cases where we decide to throw away a message + // and the handle has not yet been constructed + if (_handle != null) + { + _handle.remove(); + } + } + else + { + if (count < 0) + { + throw new RuntimeException("Reference count for message id " + debugIdentity() + + " has gone below 0."); + } + } + } + + public String debugIdentity() + { + return "(HC:" + System.identityHashCode(this) + " ID:" + getMessageNumber() + " Ref:" + getReferenceCount() + ")"; + } + + protected int getReferenceCount() + { + return _referenceCount.get(); + } +}
\ No newline at end of file diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_0_10.java index 2297e4200d..f9863f4945 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_0_10.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_0_10.java @@ -44,7 +44,7 @@ public class MessageMetaData_0_10 implements StorableMessageMetaData, InboundMes private int _bodySize; private volatile SoftReference<ByteBuffer> _body; - private static final int ENCODER_SIZE = 1 << 16; + private static final int ENCODER_SIZE = 1 << 10; public static final MessageMetaDataType.Factory<MessageMetaData_0_10> FACTORY = new MetaDataFactory(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java index af78042a63..1a230a2590 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java @@ -29,18 +29,14 @@ import java.nio.ByteBuffer; import java.lang.ref.WeakReference; -public class MessageTransferMessage implements InboundMessage, ServerMessage<MessageTransferMessage> +public class MessageTransferMessage extends AbstractServerMessageImpl implements InboundMessage { - - private StoredMessage<MessageMetaData_0_10> _storeMessage; - - private WeakReference<Session> _sessionRef; public MessageTransferMessage(StoredMessage<MessageMetaData_0_10> storeMessage, WeakReference<Session> sessionRef) { - + super(storeMessage); _storeMessage = storeMessage; _sessionRef = sessionRef; } @@ -145,5 +141,4 @@ public class MessageTransferMessage implements InboundMessage, ServerMessage<Mes { return _sessionRef == null ? null : (ServerSession) _sessionRef.get(); } - } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/TransferMessageReference.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/TransferMessageReference.java index ed189c49c4..cb44f80b91 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/TransferMessageReference.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/TransferMessageReference.java @@ -29,11 +29,11 @@ public class TransferMessageReference extends MessageReference<MessageTransferMe protected void onReference(MessageTransferMessage message) { - + message.incrementReference(); } protected void onRelease(MessageTransferMessage message) { - + message.decrementReference(); } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java index b51e6aff1a..bc0d4e3bcc 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java @@ -72,5 +72,5 @@ public interface AMQConnectionModel extends StatisticsGatherer public String getUserName(); - public boolean isSessionNameUnique(String name); + public boolean isSessionNameUnique(byte[] name); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java index bff0a79de1..b960ce8608 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java @@ -1396,8 +1396,9 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr } @Override - public boolean isSessionNameUnique(String name) + public boolean isSessionNameUnique(byte[] name) { + // 0-8/0-9/0-9-1 sessions don't have names return true; } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/BrokerConfigAdapter.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/BrokerConfigAdapter.java index 108533ef96..6a36b22400 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/BrokerConfigAdapter.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/BrokerConfigAdapter.java @@ -23,7 +23,10 @@ package org.apache.qpid.server.registry; import org.apache.qpid.server.configuration.*; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.common.QpidProperties; +import org.apache.qpid.common.ServerPropertyNames; +import java.util.ArrayList; +import java.util.Collections; import java.util.UUID; import java.util.List; import java.util.Map; @@ -158,4 +161,19 @@ public class BrokerConfigAdapter implements BrokerConfig { return _federationTag; } + + /** + * @see org.apache.qpid.server.configuration.BrokerConfig#getFeatures() + */ + @Override + public List<String> getFeatures() + { + final List<String> features = new ArrayList<String>(); + if (!_instance.getConfiguration().getDisabledFeatures().contains(ServerPropertyNames.FEATURE_QPID_JMS_SELECTOR)) + { + features.add(ServerPropertyNames.FEATURE_QPID_JMS_SELECTOR); + } + + return Collections.unmodifiableList(features); + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java index 92f21b8b1c..adb0a84151 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java @@ -460,7 +460,6 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage public void queueDeleted(AMQQueue queue) { _deleted.set(true); -// _channel.queueDeleted(queue); } public boolean filtersMessages() diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java index d83013afba..e18b453db3 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java @@ -259,10 +259,11 @@ public class ServerConnection extends Connection implements AMQConnectionModel, public void close(AMQConstant cause, String message) throws AMQException { + closeSubscriptions(); ConnectionCloseCode replyCode = ConnectionCloseCode.NORMAL; try { - replyCode = ConnectionCloseCode.get(cause.getCode()); + replyCode = ConnectionCloseCode.get(cause.getCode()); } catch (IllegalArgumentException iae) { @@ -389,7 +390,7 @@ public class ServerConnection extends Connection implements AMQConnectionModel, } @Override - public boolean isSessionNameUnique(String name) + public boolean isSessionNameUnique(byte[] name) { return !super.hasSessionWithName(name); } @@ -399,4 +400,20 @@ public class ServerConnection extends Connection implements AMQConnectionModel, { return _authorizedPrincipal.getName(); } + + @Override + public void closed() + { + closeSubscriptions(); + super.closed(); + } + + private void closeSubscriptions() + { + for (Session ssn : getChannels()) + { + ((ServerSession)ssn).unregisterSubscriptions(); + } + } + } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java index 2de8a0425e..8d6e0e0d80 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java @@ -32,7 +32,9 @@ import java.util.StringTokenizer; import javax.security.sasl.SaslException; import javax.security.sasl.SaslServer; +import org.apache.qpid.common.ServerPropertyNames; import org.apache.qpid.protocol.ProtocolEngine; +import org.apache.qpid.server.configuration.BrokerConfig; import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.registry.IApplicationRegistry; @@ -58,15 +60,14 @@ import org.apache.qpid.transport.SessionDetached; public class ServerConnectionDelegate extends ServerDelegate { - private String _localFQDN; + private final String _localFQDN; private final IApplicationRegistry _appRegistry; public ServerConnectionDelegate(IApplicationRegistry appRegistry, String localFQDN) { - this(new HashMap<String,Object>(Collections.singletonMap("qpid.federation_tag",appRegistry.getBroker().getFederationTag())), Collections.singletonList((Object)"en_US"), appRegistry, localFQDN); + this(createConnectionProperties(appRegistry.getBroker()), Collections.singletonList((Object)"en_US"), appRegistry, localFQDN); } - public ServerConnectionDelegate(Map<String, Object> properties, List<Object> locales, IApplicationRegistry appRegistry, @@ -78,6 +79,18 @@ public class ServerConnectionDelegate extends ServerDelegate _localFQDN = localFQDN; } + private static Map<String, Object> createConnectionProperties(final BrokerConfig brokerConfig) + { + final Map<String,Object> map = new HashMap<String,Object>(2); + map.put(ServerPropertyNames.FEDERATION_TAG, brokerConfig.getFederationTag()); + final List<String> features = brokerConfig.getFeatures(); + if (features != null && features.size() > 0) + { + map.put(ServerPropertyNames.QPID_FEATURES, features); + } + return map; + } + private static List<Object> parseToList(String mechanisms) { List<Object> list = new ArrayList<Object>(); @@ -234,22 +247,22 @@ public class ServerConnectionDelegate extends ServerDelegate @Override public void sessionAttach(final Connection conn, final SessionAttach atc) { - final String clientId = new String(atc.getName()); - final Session ssn = getSession(conn, atc); + final Session ssn; - if(isSessionNameUnique(clientId,conn)) + if(isSessionNameUnique(atc.getName(), conn)) { + ssn = sessionAttachImpl(conn, atc); conn.registerSession(ssn); - super.sessionAttach(conn, atc); } else { + ssn = getSession(conn, atc); ssn.invoke(new SessionDetached(atc.getName(), SessionDetachCode.SESSION_BUSY)); ssn.closed(); } } - private boolean isSessionNameUnique(final String name, final Connection conn) + private boolean isSessionNameUnique(final byte[] name, final Connection conn) { final ServerConnection sconn = (ServerConnection) conn; final String userId = sconn.getUserName(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java index 28dec2ad28..429cc4cf66 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java @@ -53,6 +53,7 @@ import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.actors.GenericActor; import org.apache.qpid.server.logging.messages.ChannelMessages; +import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.AMQSessionModel; @@ -172,6 +173,7 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi public void postCommit() { + MessageReference<?> ref = message.newReference(); for(int i = 0; i < _queues.length; i++) { try @@ -184,6 +186,7 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi throw new RuntimeException(e); } } + ref.release(); } public void onRollback() @@ -412,12 +415,11 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi { queue.unregisterSubscription(sub); } - } catch (AMQException e) { // TODO - _logger.error("Failed to unregister subscription", e); + _logger.error("Failed to unregister subscription :" + e.getMessage(), e); } finally { @@ -683,12 +685,17 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi { // unregister subscriptions in order to prevent sending of new messages // to subscriptions with closing session + unregisterSubscriptions(); + + super.close(); + } + + void unregisterSubscriptions() + { final Collection<Subscription_0_10> subscriptions = getSubscriptions(); for (Subscription_0_10 subscription_0_10 : subscriptions) { unregister(subscription_0_10); } - - super.close(); } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java index 17bd06538f..c87919b478 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java @@ -1261,11 +1261,10 @@ public class ServerSessionDelegate extends SessionDelegate { setThreadSubject(session); - for(Subscription_0_10 sub : getSubscriptions(session)) - { - ((ServerSession)session).unregister(sub); - } - ((ServerSession)session).onClose(); + ServerSession serverSession = (ServerSession)session; + + serverSession.unregisterSubscriptions(); + serverSession.onClose(); } @Override @@ -1274,11 +1273,6 @@ public class ServerSessionDelegate extends SessionDelegate closed(session); } - public Collection<Subscription_0_10> getSubscriptions(Session session) - { - return ((ServerSession)session).getSubscriptions(); - } - private void setThreadSubject(Session session) { final ServerConnection scon = (ServerConnection) session.getConnection(); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java index d368a2d1ee..9afd2a45a9 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java @@ -256,7 +256,7 @@ public class ServerConfigurationTest extends QpidTestCase assertEquals(false, _serverConfig.getManagementSSLEnabled()); } - public void testGetManagementKeyStorePassword() throws ConfigurationException + public void testGetManagementKeystorePassword() throws ConfigurationException { // Check default _serverConfig.initialise(); @@ -534,43 +534,57 @@ public class ServerConfigurationTest extends QpidTestCase assertEquals("10", _serverConfig.getSSLPorts().get(0)); } - public void testGetKeystorePath() throws ConfigurationException + public void testGetConnectorKeystorePath() throws ConfigurationException { // Check default _serverConfig.initialise(); - assertNull(_serverConfig.getKeystorePath()); + assertNull(_serverConfig.getConnectorKeyStorePath()); // Check value we set - _config.setProperty("connector.ssl.keystorePath", "a"); + _config.setProperty("connector.ssl.keyStorePath", "a"); _serverConfig = new ServerConfiguration(_config); _serverConfig.initialise(); - assertEquals("a", _serverConfig.getKeystorePath()); + assertEquals("a", _serverConfig.getConnectorKeyStorePath()); + + // Ensure we continue to support the old name keystorePath + _config.clearProperty("connector.ssl.keyStorePath"); + _config.setProperty("connector.ssl.keystorePath", "b"); + _serverConfig = new ServerConfiguration(_config); + _serverConfig.initialise(); + assertEquals("b", _serverConfig.getConnectorKeyStorePath()); } - public void testGetKeystorePassword() throws ConfigurationException + public void testGetConnectorKeystorePassword() throws ConfigurationException { // Check default _serverConfig.initialise(); - assertNull(_serverConfig.getKeystorePassword()); + assertNull(_serverConfig.getConnectorKeyStorePassword()); // Check value we set - _config.setProperty("connector.ssl.keystorePassword", "a"); + _config.setProperty("connector.ssl.keyStorePassword", "a"); + _serverConfig = new ServerConfiguration(_config); + _serverConfig.initialise(); + assertEquals("a", _serverConfig.getConnectorKeyStorePassword()); + + // Ensure we continue to support the old name keystorePassword + _config.clearProperty("connector.ssl.keyStorePassword"); + _config.setProperty("connector.ssl.keystorePassword", "b"); _serverConfig = new ServerConfiguration(_config); _serverConfig.initialise(); - assertEquals("a", _serverConfig.getKeystorePassword()); + assertEquals("b", _serverConfig.getConnectorKeyStorePassword()); } - public void testGetCertType() throws ConfigurationException + public void testGetConnectorCertType() throws ConfigurationException { // Check default _serverConfig.initialise(); - assertEquals("SunX509", _serverConfig.getCertType()); + assertEquals("SunX509", _serverConfig.getConnectorCertType()); // Check value we set _config.setProperty("connector.ssl.certType", "a"); _serverConfig = new ServerConfiguration(_config); _serverConfig.initialise(); - assertEquals("a", _serverConfig.getCertType()); + assertEquals("a", _serverConfig.getConnectorCertType()); } public void testGetUseBiasedWrites() throws ConfigurationException @@ -1284,7 +1298,7 @@ public class ServerConfigurationTest extends QpidTestCase } /** - * Test that a non-existant virtualhost file throws a {@link ConfigurationException}. + * Test that a non-existent virtualhost file throws a {@link ConfigurationException}. * <p> * Test for QPID-2624 */ @@ -1312,7 +1326,27 @@ public class ServerConfigurationTest extends QpidTestCase } } - /* + /** + * Tests that element disabledFeatures allows features that would + * otherwise be advertised by the broker to be turned off. + */ + public void testDisabledFeatures() throws ConfigurationException + { + // Check default + _serverConfig.initialise(); + _serverConfig = new ServerConfiguration(_config); + assertEquals("Unexpected size", 0, _serverConfig.getDisabledFeatures().size()); + + // Check value we set + _config.addProperty("disabledFeatures", "qpid.feature1"); + _config.addProperty("disabledFeatures", "qpid.feature2"); + _serverConfig = new ServerConfiguration(_config); + + assertEquals("Unexpected size",2, _serverConfig.getDisabledFeatures().size()); + assertTrue("Unexpected contents", _serverConfig.getDisabledFeatures().contains("qpid.feature1")); + } + + /** * Tests that the old element security.jmx.access (that used to be used * to define JMX access rights) is rejected. */ @@ -1338,7 +1372,7 @@ public class ServerConfigurationTest extends QpidTestCase } } - /* + /** * Tests that the old element security.jmx.principal-database (that used to define the * principal database used for JMX authentication) is rejected. */ @@ -1364,7 +1398,7 @@ public class ServerConfigurationTest extends QpidTestCase } } - /* + /** * Tests that the old element security.principal-databases. ... (that used to define * principal databases) is rejected. */ @@ -1389,7 +1423,7 @@ public class ServerConfigurationTest extends QpidTestCase } } - /* + /** * Tests that the old element housekeeping.expiredMessageCheckPeriod. ... (that was * replaced by housekeeping.checkPeriod) is rejected. */ diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/plugins/OsgiSystemPackageUtilTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/plugins/OsgiSystemPackageUtilTest.java index 4a03445357..f2249c5931 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/plugins/OsgiSystemPackageUtilTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/plugins/OsgiSystemPackageUtilTest.java @@ -72,11 +72,11 @@ public class OsgiSystemPackageUtilTest extends QpidTestCase _map.put("org.apache.qpid.xyz", "1.0.0"); _map.put("org.abc", "1.2.3"); - _util = new OsgiSystemPackageUtil(new Version("0.13"), _map); + _util = new OsgiSystemPackageUtil(new Version("0.15"), _map); final String systemPackageString = _util.getFormattedSystemPackageString(); - assertEquals("org.abc; version=1.2.3, org.apache.qpid.xyz; version=0.13.0", systemPackageString); + assertEquals("org.abc; version=1.2.3, org.apache.qpid.xyz; version=0.15.0", systemPackageString); } public void testWithQpidPackageWithoutQpidReleaseNumberSet() throws Exception diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/ReferenceCountingTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/ReferenceCountingTest.java index 2d41eb9899..2ffa157ca8 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/ReferenceCountingTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/ReferenceCountingTest.java @@ -86,11 +86,7 @@ public class ReferenceCountingTest extends QpidTestCase AMQMessage message = new AMQMessage(storedMessage); - message = message.takeReference(); - - // we call routing complete to set up the handle - // message.routingComplete(_store, _storeContext, new MessageHandleFactory()); - + message.incrementReference(); assertEquals(1, _store.getMessageCount()); message.decrementReference(); @@ -146,12 +142,12 @@ public class ReferenceCountingTest extends QpidTestCase AMQMessage message = new AMQMessage(storedMessage); - message = message.takeReference(); + message.incrementReference(); // we call routing complete to set up the handle // message.routingComplete(_store, _storeContext, new MessageHandleFactory()); assertEquals(1, _store.getMessageCount()); - message = message.takeReference(); + message.incrementReference(); message.decrementReference(); assertEquals(1, _store.getMessageCount()); } |