summaryrefslogtreecommitdiff
path: root/qpid/java/broker
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker')
-rw-r--r--qpid/java/broker/bin/qpid-server.bat4
-rw-r--r--qpid/java/broker/build.xml3
-rw-r--r--qpid/java/broker/etc/config.xml4
-rw-r--r--qpid/java/broker/etc/virtualhosts.xml15
-rwxr-xr-xqpid/java/broker/src/main/java/broker.bnd26
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java30
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/Broker.java6
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/BrokerConfig.java16
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/BrokerConfigType.java1
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java52
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/federation/BrokerLink.java3
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java99
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java84
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_0_10.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java9
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/message/TransferMessageReference.java4
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java3
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/registry/BrokerConfigAdapter.java18
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java1
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java21
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java29
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java15
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java14
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java68
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/plugins/OsgiSystemPackageUtilTest.java4
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/store/ReferenceCountingTest.java10
27 files changed, 355 insertions, 188 deletions
diff --git a/qpid/java/broker/bin/qpid-server.bat b/qpid/java/broker/bin/qpid-server.bat
index af543decb3..6b7bbcb96e 100644
--- a/qpid/java/broker/bin/qpid-server.bat
+++ b/qpid/java/broker/bin/qpid-server.bat
@@ -65,7 +65,7 @@ if "%AMQJ_LOGGING_LEVEL%" == "" set AMQJ_LOGGING_LEVEL=info
REM Set the default system properties that we'll use now that they have
REM all been initialised
-set SYSTEM_PROPS=-Damqj.logging.level=%AMQJ_LOGGING_LEVEL% "-DQPID_HOME=%QPID_HOME%" "-DQPID_WORK=%QPID_WORK%"
+set SYSTEM_PROPS=-Damqj.logging.level=%AMQJ_LOGGING_LEVEL% -DQPID_HOME="%QPID_HOME%" -DQPID_WORK="%QPID_WORK%"
if "%EXTERNAL_CLASSPATH%" == "" set EXTERNAL_CLASSPATH=%CLASSPATH%
@@ -77,7 +77,7 @@ goto afterQpidClasspath
:noQpidClasspath
echo Warning: Qpid classpath not set. CLASSPATH set to %QPID_HOME%\lib\qpid-all.jar
-set CLASSPATH=%QPID_HOME%\lib\qpid-all.jar
+set CLASSPATH=%QPID_HOME%\lib\qpid-all.jar;%QPID_HOME%\lib\opt\*
:afterQpidClasspath
REM start parsing -run arguments
diff --git a/qpid/java/broker/build.xml b/qpid/java/broker/build.xml
index b15a7fd02e..6ea2b9a63e 100644
--- a/qpid/java/broker/build.xml
+++ b/qpid/java/broker/build.xml
@@ -19,10 +19,10 @@
-
-->
<project name="AMQ Broker" default="build">
-
<property name="module.depends" value="management/common common amqp-1-0-common"/>
<property name="module.test.depends" value="common/test" />
<property name="module.main" value="org.apache.qpid.server.Main"/>
+ <property name="module.genpom" value="true"/>
<import file="../module.xml"/>
@@ -84,4 +84,5 @@
<target name="release-bin" depends="release-bin-tasks"/>
+ <target name="bundle" depends="bundle-tasks"/>
</project>
diff --git a/qpid/java/broker/etc/config.xml b/qpid/java/broker/etc/config.xml
index 2752274155..d18e1392e6 100644
--- a/qpid/java/broker/etc/config.xml
+++ b/qpid/java/broker/etc/config.xml
@@ -35,8 +35,8 @@
<enabled>false</enabled>
<port>5671</port>
<sslOnly>false</sslOnly>
- <keystorePath>/path/to/keystore.ks</keystorePath>
- <keystorePassword>keystorepass</keystorePassword>
+ <keyStorePath>/path/to/keystore.ks</keyStorePath>
+ <keyStorePassword>keystorepass</keyStorePassword>
</ssl>
<port>5672</port>
<socketReceiveBuffer>262144</socketReceiveBuffer>
diff --git a/qpid/java/broker/etc/virtualhosts.xml b/qpid/java/broker/etc/virtualhosts.xml
index 33a48a1349..4dcdcda6d2 100644
--- a/qpid/java/broker/etc/virtualhosts.xml
+++ b/qpid/java/broker/etc/virtualhosts.xml
@@ -25,8 +25,9 @@
<name>localhost</name>
<localhost>
<store>
- <class>org.apache.qpid.server.store.MemoryMessageStore
- </class>
+ <class>org.apache.qpid.server.store.MemoryMessageStore</class>
+ <!--<class>org.apache.qpid.server.store.DerbyMessageStore</class>
+ <environment-path>${QPID_WORK}/derbystore</environment-path>-->
</store>
<housekeeping>
@@ -85,8 +86,9 @@
<name>development</name>
<development>
<store>
- <class>org.apache.qpid.server.store.MemoryMessageStore
- </class>
+ <class>org.apache.qpid.server.store.MemoryMessageStore</class>
+ <!--<class>org.apache.qpid.server.store.DerbyMessageStore</class>
+ <environment-path>${QPID_WORK}/derbystore</environment-path>-->
</store>
<queues>
@@ -123,8 +125,9 @@
<name>test</name>
<test>
<store>
- <class>org.apache.qpid.server.store.MemoryMessageStore
- </class>
+ <class>org.apache.qpid.server.store.MemoryMessageStore</class>
+ <!--<class>org.apache.qpid.server.store.DerbyMessageStore</class>
+ <environment-path>${QPID_WORK}/derbystore</environment-path>-->
</store>
<queues>
diff --git a/qpid/java/broker/src/main/java/broker.bnd b/qpid/java/broker/src/main/java/broker.bnd
new file mode 100755
index 0000000000..25b0495a63
--- /dev/null
+++ b/qpid/java/broker/src/main/java/broker.bnd
@@ -0,0 +1,26 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+ver: 0.15.0
+
+Bundle-SymbolicName: qpid-broker
+Bundle-Version: ${ver}
+Export-Package: *;version=${ver}
+Bundle-RequiredExecutionEnvironment: J2SE-1.5
+
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java
index 9c77032d4c..6abef6fd6b 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java
@@ -694,7 +694,8 @@ public class QMFService implements ConfigStore.ConfigEventListener, Closeable
public BrokerSchema.BrokerClass.QueueMoveMessagesMethodResponseCommand queueMoveMessages(final BrokerSchema.BrokerClass.QueueMoveMessagesMethodResponseCommandFactory factory,
final String srcQueue,
final String destQueue,
- final Long qty)
+ final Long qty,
+ final Map filter) // TODO: move based on group identifier
{
// TODO
return factory.createResponseCommand(CompletionCode.NOT_IMPLEMENTED);
@@ -712,6 +713,19 @@ public class QMFService implements ConfigStore.ConfigEventListener, Closeable
return factory.createResponseCommand(CompletionCode.NOT_IMPLEMENTED);
}
+ public BrokerSchema.BrokerClass.GetTimestampConfigMethodResponseCommand getTimestampConfig(final BrokerSchema.BrokerClass.GetTimestampConfigMethodResponseCommandFactory factory)
+ {
+ // TODO: timestamp support
+ return factory.createResponseCommand(CompletionCode.NOT_IMPLEMENTED);
+ }
+
+ public BrokerSchema.BrokerClass.SetTimestampConfigMethodResponseCommand setTimestampConfig(final BrokerSchema.BrokerClass.SetTimestampConfigMethodResponseCommandFactory factory,
+ final java.lang.Boolean receive)
+ {
+ // TODO: timestamp support
+ return factory.createResponseCommand(CompletionCode.NOT_IMPLEMENTED);
+ }
+
public BrokerSchema.BrokerClass.CreateMethodResponseCommand create(final BrokerSchema.BrokerClass.CreateMethodResponseCommandFactory factory,
final String type,
final String name,
@@ -731,6 +745,14 @@ public class QMFService implements ConfigStore.ConfigEventListener, Closeable
return factory.createResponseCommand(CompletionCode.NOT_IMPLEMENTED);
}
+ public BrokerSchema.BrokerClass.QueryMethodResponseCommand query(final BrokerSchema.BrokerClass.QueryMethodResponseCommandFactory factory,
+ final String type,
+ final String name)
+ {
+ //TODO:
+ return factory.createResponseCommand(CompletionCode.NOT_IMPLEMENTED);
+ }
+
public UUID getId()
{
return _obj.getId();
@@ -1102,7 +1124,8 @@ public class QMFService implements ConfigStore.ConfigEventListener, Closeable
}
public BrokerSchema.QueueClass.PurgeMethodResponseCommand purge(final BrokerSchema.QueueClass.PurgeMethodResponseCommandFactory factory,
- final Long request)
+ final Long request,
+ final Map filter) // TODO: support for purge-by-group-identifier
{
try
{
@@ -1118,7 +1141,8 @@ public class QMFService implements ConfigStore.ConfigEventListener, Closeable
public BrokerSchema.QueueClass.RerouteMethodResponseCommand reroute(final BrokerSchema.QueueClass.RerouteMethodResponseCommandFactory factory,
final Long request,
final Boolean useAltExchange,
- final String exchange)
+ final String exchange,
+ final Map filter) // TODO: support for re-route-by-group-identifier
{
//TODO
return factory.createResponseCommand(CompletionCode.NOT_IMPLEMENTED);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/Broker.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/Broker.java
index c337c4db75..5c1814590c 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/Broker.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/Broker.java
@@ -218,9 +218,9 @@ public class Broker
if (serverConfig.getEnableSSL())
{
- final String keystorePath = serverConfig.getKeystorePath();
- final String keystorePassword = serverConfig.getKeystorePassword();
- final String certType = serverConfig.getCertType();
+ final String keystorePath = serverConfig.getConnectorKeyStorePath();
+ final String keystorePassword = serverConfig.getConnectorKeyStorePassword();
+ final String certType = serverConfig.getConnectorCertType();
final SSLContext sslContext = SSLContextFactory.buildServerContext(keystorePath, keystorePassword, certType);
for(int sslPort : sslPorts)
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/BrokerConfig.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/BrokerConfig.java
index 5cdb886821..7dffc2d3c0 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/BrokerConfig.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/BrokerConfig.java
@@ -21,6 +21,8 @@
package org.apache.qpid.server.configuration;
+import java.util.List;
+
public interface BrokerConfig extends ConfiguredObject<BrokerConfigType,BrokerConfig>
{
@@ -44,6 +46,19 @@ public interface BrokerConfig extends ConfiguredObject<BrokerConfigType,BrokerC
String getDataDirectory();
+ String getFederationTag();
+
+ /**
+ * List of feature(s) to be advertised to clients on connection.
+ * Feature names are strings, beginning with qpid. followed by more or more
+ * words separated by minus signs e.g. qpid.jms-selector.
+ *
+ * If there are no features, this method must return an empty array.
+ *
+ * @return list of feature names
+ */
+ List<String> getFeatures();
+
void addVirtualHost(VirtualHostConfig virtualHost);
void createBrokerConnection(String transport,
@@ -53,5 +68,4 @@ public interface BrokerConfig extends ConfiguredObject<BrokerConfigType,BrokerC
String authMechanism,
String username, String password);
- String getFederationTag();
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/BrokerConfigType.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/BrokerConfigType.java
index 82b2fc82d2..e1cf87277b 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/BrokerConfigType.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/BrokerConfigType.java
@@ -22,7 +22,6 @@
package org.apache.qpid.server.configuration;
import java.util.*;
-import java.io.File;
public final class BrokerConfigType extends ConfigObjectType<BrokerConfigType, BrokerConfig>
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
index 9ca916a633..0d347873c2 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
@@ -123,7 +123,7 @@ public class ServerConfiguration extends ConfigurationPlugin
* Configuration Manager to be initialised in the Application Registry.
* <p>
* If using this ServerConfiguration via an ApplicationRegistry there is no
- * need to explictly call {@link #initialise()} as this is done via the
+ * need to explicitly call {@link #initialise()} as this is done via the
* {@link ApplicationRegistry#initialise()} method.
*
* @param configurationURL
@@ -169,7 +169,7 @@ public class ServerConfiguration extends ConfigurationPlugin
* Configuration Manager to be initialised in the Application Registry.
* <p>
* If using this ServerConfiguration via an ApplicationRegistry there is no
- * need to explictly call {@link #initialise()} as this is done via the
+ * need to explicitly call {@link #initialise()} as this is done via the
* {@link ApplicationRegistry#initialise()} method.
*
* @param conf
@@ -239,6 +239,22 @@ public class ServerConfiguration extends ConfigurationPlugin
+ (_configFile == null ? "" : " Configuration file : " + _configFile);
throw new ConfigurationException(message);
}
+
+ // QPID-3517: Inconsistency in capitalisation in the SSL configuration keys used within the connector and management configuration
+ // sections. For the moment, continue to understand both but generate a deprecated warning if the less preferred keystore is used.
+ for (String key : new String[] {"management.ssl.keystorePath",
+ "management.ssl.keystorePassword," +
+ "connector.ssl.keystorePath",
+ "connector.ssl.keystorePassword"})
+ {
+ if (contains(key))
+ {
+ final String deprecatedXpath = key.replaceAll("\\.", "/");
+ final String preferredXpath = deprecatedXpath.replaceAll("keystore", "keyStore");
+ _logger.warn("Validation warning: " + deprecatedXpath + " is deprecated and must be replaced by " + preferredXpath
+ + (_configFile == null ? "" : " Configuration file : " + _configFile));
+ }
+ }
}
/*
@@ -404,7 +420,7 @@ public class ServerConfiguration extends ConfigurationPlugin
public final static Configuration flatConfig(File file) throws ConfigurationException
{
// We have to override the interpolate methods so that
- // interpolation takes place accross the entirety of the
+ // interpolation takes place across the entirety of the
// composite configuration. Without doing this each
// configuration object only interpolates variables defined
// inside itself.
@@ -551,7 +567,8 @@ public class ServerConfiguration extends ConfigurationPlugin
public String getManagementKeyStorePath()
{
- return getStringValue("management.ssl.keyStorePath");
+ final String fallback = getStringValue("management.ssl.keystorePath");
+ return getStringValue("management.ssl.keyStorePath", fallback);
}
public boolean getManagementSSLEnabled()
@@ -561,7 +578,8 @@ public class ServerConfiguration extends ConfigurationPlugin
public String getManagementKeyStorePassword()
{
- return getStringValue("management.ssl.keyStorePassword");
+ final String fallback = getStringValue("management.ssl.keystorePassword");
+ return getStringValue("management.ssl.keyStorePassword", fallback);
}
public boolean getQueueAutoRegister()
@@ -699,17 +717,19 @@ public class ServerConfiguration extends ConfigurationPlugin
return getListValue("connector.ssl.port", Collections.<Integer>singletonList(DEFAULT_SSL_PORT));
}
- public String getKeystorePath()
+ public String getConnectorKeyStorePath()
{
- return getStringValue("connector.ssl.keystorePath");
+ final String fallback = getStringValue("connector.ssl.keystorePath"); // pre-0.13 broker supported this name.
+ return getStringValue("connector.ssl.keyStorePath", fallback);
}
- public String getKeystorePassword()
+ public String getConnectorKeyStorePassword()
{
- return getStringValue("connector.ssl.keystorePassword");
+ final String fallback = getStringValue("connector.ssl.keystorePassword"); // pre-0.13 brokers supported this name.
+ return getStringValue("connector.ssl.keyStorePassword", fallback);
}
- public String getCertType()
+ public String getConnectorCertType()
{
return getStringValue("connector.ssl.certType", "SunX509");
}
@@ -773,4 +793,16 @@ public class ServerConfiguration extends ConfigurationPlugin
{
return getIntValue("maximumChannelCount", 256);
}
+
+ /**
+ * List of Broker features that have been disabled within configuration. Disabled
+ * features won't be advertised to the clients on connection.
+ *
+ * @return list of disabled features, or empty list if no features are disabled.
+ */
+ public List<String> getDisabledFeatures()
+ {
+ final List<String> disabledFeatures = getListValue("disabledFeatures", Collections.emptyList());
+ return disabledFeatures;
+ }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/BrokerLink.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/BrokerLink.java
index f21158cd0c..f330e2f708 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/BrokerLink.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/BrokerLink.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.federation;
+import org.apache.qpid.common.ServerPropertyNames;
import org.apache.qpid.server.configuration.ConfigStore;
import org.apache.qpid.server.configuration.ConfiguredObject;
import org.apache.qpid.server.configuration.ConnectionConfig;
@@ -252,7 +253,7 @@ public class BrokerLink implements LinkConfig, ConnectionListener
_qpidConnection.connect(_host, _port, _remoteVhost, _username, _password, "ssl".equals(_transport), _authMechanism);
final Map<String,Object> serverProps = _qpidConnection.getServerProperties();
- _remoteFederationTag = (String) serverProps.get("qpid.federation_tag");
+ _remoteFederationTag = (String) serverProps.get(ServerPropertyNames.FEDERATION_TAG);
if(_remoteFederationTag == null)
{
_remoteFederationTag = UUID.fromString(_transport+":"+_host+":"+_port).toString();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java
index f16e30ef92..0e0b18aa2f 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java
@@ -30,20 +30,17 @@ import org.apache.qpid.server.configuration.SessionConfig;
import org.apache.qpid.server.queue.AMQQueue;
-import java.util.concurrent.atomic.AtomicInteger;
import java.lang.ref.WeakReference;
import java.nio.ByteBuffer;
/**
* A deliverable message.
*/
-public class AMQMessage implements ServerMessage<AMQMessage>
+public class AMQMessage extends AbstractServerMessageImpl
{
/** Used for debugging purposes. */
private static final Logger _log = Logger.getLogger(AMQMessage.class);
- private final AtomicInteger _referenceCount = new AtomicInteger(0);
-
/** Flag to indicate that this message requires 'immediate' delivery. */
private static final byte IMMEDIATE = 0x01;
@@ -76,6 +73,8 @@ public class AMQMessage implements ServerMessage<AMQMessage>
public AMQMessage(StoredMessage<MessageMetaData> handle, WeakReference<AMQChannel> channelRef)
{
+ super(handle);
+
_handle = handle;
final MessageMetaData metaData = handle.getMetaData();
_size = metaData.getContentSize();
@@ -89,12 +88,6 @@ public class AMQMessage implements ServerMessage<AMQMessage>
_channelRef = channelRef;
}
-
- public String debugIdentity()
- {
- return "(HC:" + System.identityHashCode(this) + " ID:" + getMessageId() + " Ref:" + _referenceCount.get() + ")";
- }
-
public void setExpiration(final long expiration)
{
@@ -102,11 +95,6 @@ public class AMQMessage implements ServerMessage<AMQMessage>
}
- public boolean isReferenced()
- {
- return _referenceCount.get() > 0;
- }
-
public MessageMetaData getMessageMetaData()
{
return _handle.getMetaData();
@@ -117,88 +105,12 @@ public class AMQMessage implements ServerMessage<AMQMessage>
return getMessageMetaData().getContentHeaderBody();
}
-
-
public Long getMessageId()
{
return _handle.getMessageNumber();
}
/**
- * Creates a long-lived reference to this message, and increments the count of such references, as an atomic
- * operation.
- */
- public AMQMessage takeReference()
- {
- incrementReference(); // _referenceCount.incrementAndGet();
-
- return this;
- }
-
- public boolean incrementReference()
- {
- return incrementReference(1);
- }
-
- /* Threadsafe. Increment the reference count on the message. */
- public boolean incrementReference(int count)
- {
-
- if(_referenceCount.addAndGet(count) <= 0)
- {
- _referenceCount.addAndGet(-count);
- return false;
- }
- else
- {
- return true;
- }
-
- }
-
- /**
- * Threadsafe. This will decrement the reference count and when it reaches zero will remove the message from the
- * message store.
- *
- *
- * @throws org.apache.qpid.server.queue.MessageCleanupException when an attempt was made to remove the message from the message store and that
- * failed
- */
- public void decrementReference()
- {
- int count = _referenceCount.decrementAndGet();
-
- // note that the operation of decrementing the reference count and then removing the message does not
- // have to be atomic since the ref count starts at 1 and the exchange itself decrements that after
- // the message has been passed to all queues. i.e. we are
- // not relying on the all the increments having taken place before the delivery manager decrements.
- if (count == 0)
- {
- // set the reference count way below 0 so that we can detect that the message has been deleted
- // this is to guard against the message being spontaneously recreated (from the mgmt console)
- // by copying from other queues at the same time as it is being removed.
- _referenceCount.set(Integer.MIN_VALUE/2);
-
- // must check if the handle is null since there may be cases where we decide to throw away a message
- // and the handle has not yet been constructed
- if (_handle != null)
- {
- _handle.remove();
-
- }
- }
- else
- {
- if (count < 0)
- {
- throw new RuntimeException("Reference count for message id " + debugIdentity()
- + " has gone below 0.");
- }
- }
- }
-
-
- /**
* Called selectors to determin if the message has already been sent
*
* @return _deliveredToConsumer
@@ -323,10 +235,7 @@ public class AMQMessage implements ServerMessage<AMQMessage>
public String toString()
{
- // return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken : " +
- // _taken + " by :" + _takenBySubcription;
-
- return "Message[" + debugIdentity() + "]: " + getMessageId() + "; ref count: " + _referenceCount;
+ return "Message[" + debugIdentity() + "]: " + getMessageId() + "; ref count: " + getReferenceCount();
}
public int getContent(ByteBuffer buf, int offset)
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
new file mode 100644
index 0000000000..186bb8601c
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
@@ -0,0 +1,84 @@
+package org.apache.qpid.server.message;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.qpid.server.store.StoredMessage;
+
+public abstract class AbstractServerMessageImpl implements ServerMessage
+{
+ private final AtomicInteger _referenceCount = new AtomicInteger(0);
+ private final StoredMessage<?> _handle;
+
+ public AbstractServerMessageImpl(StoredMessage<?> handle)
+ {
+ _handle = handle;
+ }
+
+ public boolean incrementReference()
+ {
+ return incrementReference(1);
+ }
+
+ public boolean incrementReference(int count)
+ {
+ if(_referenceCount.addAndGet(count) <= 0)
+ {
+ _referenceCount.addAndGet(-count);
+ return false;
+ }
+ else
+ {
+ return true;
+ }
+ }
+
+ /**
+ * Threadsafe. This will decrement the reference count and when it reaches zero will remove the message from the
+ * message store.
+ *
+ *
+ * @throws org.apache.qpid.server.queue.MessageCleanupException when an attempt was made to remove the message from the message store and that
+ * failed
+ */
+ public void decrementReference()
+ {
+ int count = _referenceCount.decrementAndGet();
+
+ // note that the operation of decrementing the reference count and then removing the message does not
+ // have to be atomic since the ref count starts at 1 and the exchange itself decrements that after
+ // the message has been passed to all queues. i.e. we are
+ // not relying on the all the increments having taken place before the delivery manager decrements.
+ if (count == 0)
+ {
+ // set the reference count way below 0 so that we can detect that the message has been deleted
+ // this is to guard against the message being spontaneously recreated (from the mgmt console)
+ // by copying from other queues at the same time as it is being removed.
+ _referenceCount.set(Integer.MIN_VALUE/2);
+
+ // must check if the handle is null since there may be cases where we decide to throw away a message
+ // and the handle has not yet been constructed
+ if (_handle != null)
+ {
+ _handle.remove();
+ }
+ }
+ else
+ {
+ if (count < 0)
+ {
+ throw new RuntimeException("Reference count for message id " + debugIdentity()
+ + " has gone below 0.");
+ }
+ }
+ }
+
+ public String debugIdentity()
+ {
+ return "(HC:" + System.identityHashCode(this) + " ID:" + getMessageNumber() + " Ref:" + getReferenceCount() + ")";
+ }
+
+ protected int getReferenceCount()
+ {
+ return _referenceCount.get();
+ }
+} \ No newline at end of file
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_0_10.java
index 2297e4200d..f9863f4945 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_0_10.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_0_10.java
@@ -44,7 +44,7 @@ public class MessageMetaData_0_10 implements StorableMessageMetaData, InboundMes
private int _bodySize;
private volatile SoftReference<ByteBuffer> _body;
- private static final int ENCODER_SIZE = 1 << 16;
+ private static final int ENCODER_SIZE = 1 << 10;
public static final MessageMetaDataType.Factory<MessageMetaData_0_10> FACTORY = new MetaDataFactory();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java
index af78042a63..1a230a2590 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java
@@ -29,18 +29,14 @@ import java.nio.ByteBuffer;
import java.lang.ref.WeakReference;
-public class MessageTransferMessage implements InboundMessage, ServerMessage<MessageTransferMessage>
+public class MessageTransferMessage extends AbstractServerMessageImpl implements InboundMessage
{
-
-
private StoredMessage<MessageMetaData_0_10> _storeMessage;
-
-
private WeakReference<Session> _sessionRef;
public MessageTransferMessage(StoredMessage<MessageMetaData_0_10> storeMessage, WeakReference<Session> sessionRef)
{
-
+ super(storeMessage);
_storeMessage = storeMessage;
_sessionRef = sessionRef;
}
@@ -145,5 +141,4 @@ public class MessageTransferMessage implements InboundMessage, ServerMessage<Mes
{
return _sessionRef == null ? null : (ServerSession) _sessionRef.get();
}
-
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/TransferMessageReference.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/TransferMessageReference.java
index ed189c49c4..cb44f80b91 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/TransferMessageReference.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/TransferMessageReference.java
@@ -29,11 +29,11 @@ public class TransferMessageReference extends MessageReference<MessageTransferMe
protected void onReference(MessageTransferMessage message)
{
-
+ message.incrementReference();
}
protected void onRelease(MessageTransferMessage message)
{
-
+ message.decrementReference();
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
index b51e6aff1a..bc0d4e3bcc 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
@@ -72,5 +72,5 @@ public interface AMQConnectionModel extends StatisticsGatherer
public String getUserName();
- public boolean isSessionNameUnique(String name);
+ public boolean isSessionNameUnique(byte[] name);
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
index bff0a79de1..b960ce8608 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
@@ -1396,8 +1396,9 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
}
@Override
- public boolean isSessionNameUnique(String name)
+ public boolean isSessionNameUnique(byte[] name)
{
+ // 0-8/0-9/0-9-1 sessions don't have names
return true;
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/BrokerConfigAdapter.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/BrokerConfigAdapter.java
index 108533ef96..6a36b22400 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/BrokerConfigAdapter.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/BrokerConfigAdapter.java
@@ -23,7 +23,10 @@ package org.apache.qpid.server.registry;
import org.apache.qpid.server.configuration.*;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.common.QpidProperties;
+import org.apache.qpid.common.ServerPropertyNames;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.UUID;
import java.util.List;
import java.util.Map;
@@ -158,4 +161,19 @@ public class BrokerConfigAdapter implements BrokerConfig
{
return _federationTag;
}
+
+ /**
+ * @see org.apache.qpid.server.configuration.BrokerConfig#getFeatures()
+ */
+ @Override
+ public List<String> getFeatures()
+ {
+ final List<String> features = new ArrayList<String>();
+ if (!_instance.getConfiguration().getDisabledFeatures().contains(ServerPropertyNames.FEATURE_QPID_JMS_SELECTOR))
+ {
+ features.add(ServerPropertyNames.FEATURE_QPID_JMS_SELECTOR);
+ }
+
+ return Collections.unmodifiableList(features);
+ }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
index 92f21b8b1c..adb0a84151 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
@@ -460,7 +460,6 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
public void queueDeleted(AMQQueue queue)
{
_deleted.set(true);
-// _channel.queueDeleted(queue);
}
public boolean filtersMessages()
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
index d83013afba..e18b453db3 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
@@ -259,10 +259,11 @@ public class ServerConnection extends Connection implements AMQConnectionModel,
public void close(AMQConstant cause, String message) throws AMQException
{
+ closeSubscriptions();
ConnectionCloseCode replyCode = ConnectionCloseCode.NORMAL;
try
{
- replyCode = ConnectionCloseCode.get(cause.getCode());
+ replyCode = ConnectionCloseCode.get(cause.getCode());
}
catch (IllegalArgumentException iae)
{
@@ -389,7 +390,7 @@ public class ServerConnection extends Connection implements AMQConnectionModel,
}
@Override
- public boolean isSessionNameUnique(String name)
+ public boolean isSessionNameUnique(byte[] name)
{
return !super.hasSessionWithName(name);
}
@@ -399,4 +400,20 @@ public class ServerConnection extends Connection implements AMQConnectionModel,
{
return _authorizedPrincipal.getName();
}
+
+ @Override
+ public void closed()
+ {
+ closeSubscriptions();
+ super.closed();
+ }
+
+ private void closeSubscriptions()
+ {
+ for (Session ssn : getChannels())
+ {
+ ((ServerSession)ssn).unregisterSubscriptions();
+ }
+ }
+
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java
index 2de8a0425e..8d6e0e0d80 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java
@@ -32,7 +32,9 @@ import java.util.StringTokenizer;
import javax.security.sasl.SaslException;
import javax.security.sasl.SaslServer;
+import org.apache.qpid.common.ServerPropertyNames;
import org.apache.qpid.protocol.ProtocolEngine;
+import org.apache.qpid.server.configuration.BrokerConfig;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.registry.IApplicationRegistry;
@@ -58,15 +60,14 @@ import org.apache.qpid.transport.SessionDetached;
public class ServerConnectionDelegate extends ServerDelegate
{
- private String _localFQDN;
+ private final String _localFQDN;
private final IApplicationRegistry _appRegistry;
public ServerConnectionDelegate(IApplicationRegistry appRegistry, String localFQDN)
{
- this(new HashMap<String,Object>(Collections.singletonMap("qpid.federation_tag",appRegistry.getBroker().getFederationTag())), Collections.singletonList((Object)"en_US"), appRegistry, localFQDN);
+ this(createConnectionProperties(appRegistry.getBroker()), Collections.singletonList((Object)"en_US"), appRegistry, localFQDN);
}
-
public ServerConnectionDelegate(Map<String, Object> properties,
List<Object> locales,
IApplicationRegistry appRegistry,
@@ -78,6 +79,18 @@ public class ServerConnectionDelegate extends ServerDelegate
_localFQDN = localFQDN;
}
+ private static Map<String, Object> createConnectionProperties(final BrokerConfig brokerConfig)
+ {
+ final Map<String,Object> map = new HashMap<String,Object>(2);
+ map.put(ServerPropertyNames.FEDERATION_TAG, brokerConfig.getFederationTag());
+ final List<String> features = brokerConfig.getFeatures();
+ if (features != null && features.size() > 0)
+ {
+ map.put(ServerPropertyNames.QPID_FEATURES, features);
+ }
+ return map;
+ }
+
private static List<Object> parseToList(String mechanisms)
{
List<Object> list = new ArrayList<Object>();
@@ -234,22 +247,22 @@ public class ServerConnectionDelegate extends ServerDelegate
@Override
public void sessionAttach(final Connection conn, final SessionAttach atc)
{
- final String clientId = new String(atc.getName());
- final Session ssn = getSession(conn, atc);
+ final Session ssn;
- if(isSessionNameUnique(clientId,conn))
+ if(isSessionNameUnique(atc.getName(), conn))
{
+ ssn = sessionAttachImpl(conn, atc);
conn.registerSession(ssn);
- super.sessionAttach(conn, atc);
}
else
{
+ ssn = getSession(conn, atc);
ssn.invoke(new SessionDetached(atc.getName(), SessionDetachCode.SESSION_BUSY));
ssn.closed();
}
}
- private boolean isSessionNameUnique(final String name, final Connection conn)
+ private boolean isSessionNameUnique(final byte[] name, final Connection conn)
{
final ServerConnection sconn = (ServerConnection) conn;
final String userId = sconn.getUserName();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
index 28dec2ad28..429cc4cf66 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
@@ -53,6 +53,7 @@ import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.actors.GenericActor;
import org.apache.qpid.server.logging.messages.ChannelMessages;
+import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
@@ -172,6 +173,7 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi
public void postCommit()
{
+ MessageReference<?> ref = message.newReference();
for(int i = 0; i < _queues.length; i++)
{
try
@@ -184,6 +186,7 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi
throw new RuntimeException(e);
}
}
+ ref.release();
}
public void onRollback()
@@ -412,12 +415,11 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi
{
queue.unregisterSubscription(sub);
}
-
}
catch (AMQException e)
{
// TODO
- _logger.error("Failed to unregister subscription", e);
+ _logger.error("Failed to unregister subscription :" + e.getMessage(), e);
}
finally
{
@@ -683,12 +685,17 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi
{
// unregister subscriptions in order to prevent sending of new messages
// to subscriptions with closing session
+ unregisterSubscriptions();
+
+ super.close();
+ }
+
+ void unregisterSubscriptions()
+ {
final Collection<Subscription_0_10> subscriptions = getSubscriptions();
for (Subscription_0_10 subscription_0_10 : subscriptions)
{
unregister(subscription_0_10);
}
-
- super.close();
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
index 17bd06538f..c87919b478 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
@@ -1261,11 +1261,10 @@ public class ServerSessionDelegate extends SessionDelegate
{
setThreadSubject(session);
- for(Subscription_0_10 sub : getSubscriptions(session))
- {
- ((ServerSession)session).unregister(sub);
- }
- ((ServerSession)session).onClose();
+ ServerSession serverSession = (ServerSession)session;
+
+ serverSession.unregisterSubscriptions();
+ serverSession.onClose();
}
@Override
@@ -1274,11 +1273,6 @@ public class ServerSessionDelegate extends SessionDelegate
closed(session);
}
- public Collection<Subscription_0_10> getSubscriptions(Session session)
- {
- return ((ServerSession)session).getSubscriptions();
- }
-
private void setThreadSubject(Session session)
{
final ServerConnection scon = (ServerConnection) session.getConnection();
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java
index d368a2d1ee..9afd2a45a9 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java
@@ -256,7 +256,7 @@ public class ServerConfigurationTest extends QpidTestCase
assertEquals(false, _serverConfig.getManagementSSLEnabled());
}
- public void testGetManagementKeyStorePassword() throws ConfigurationException
+ public void testGetManagementKeystorePassword() throws ConfigurationException
{
// Check default
_serverConfig.initialise();
@@ -534,43 +534,57 @@ public class ServerConfigurationTest extends QpidTestCase
assertEquals("10", _serverConfig.getSSLPorts().get(0));
}
- public void testGetKeystorePath() throws ConfigurationException
+ public void testGetConnectorKeystorePath() throws ConfigurationException
{
// Check default
_serverConfig.initialise();
- assertNull(_serverConfig.getKeystorePath());
+ assertNull(_serverConfig.getConnectorKeyStorePath());
// Check value we set
- _config.setProperty("connector.ssl.keystorePath", "a");
+ _config.setProperty("connector.ssl.keyStorePath", "a");
_serverConfig = new ServerConfiguration(_config);
_serverConfig.initialise();
- assertEquals("a", _serverConfig.getKeystorePath());
+ assertEquals("a", _serverConfig.getConnectorKeyStorePath());
+
+ // Ensure we continue to support the old name keystorePath
+ _config.clearProperty("connector.ssl.keyStorePath");
+ _config.setProperty("connector.ssl.keystorePath", "b");
+ _serverConfig = new ServerConfiguration(_config);
+ _serverConfig.initialise();
+ assertEquals("b", _serverConfig.getConnectorKeyStorePath());
}
- public void testGetKeystorePassword() throws ConfigurationException
+ public void testGetConnectorKeystorePassword() throws ConfigurationException
{
// Check default
_serverConfig.initialise();
- assertNull(_serverConfig.getKeystorePassword());
+ assertNull(_serverConfig.getConnectorKeyStorePassword());
// Check value we set
- _config.setProperty("connector.ssl.keystorePassword", "a");
+ _config.setProperty("connector.ssl.keyStorePassword", "a");
+ _serverConfig = new ServerConfiguration(_config);
+ _serverConfig.initialise();
+ assertEquals("a", _serverConfig.getConnectorKeyStorePassword());
+
+ // Ensure we continue to support the old name keystorePassword
+ _config.clearProperty("connector.ssl.keyStorePassword");
+ _config.setProperty("connector.ssl.keystorePassword", "b");
_serverConfig = new ServerConfiguration(_config);
_serverConfig.initialise();
- assertEquals("a", _serverConfig.getKeystorePassword());
+ assertEquals("b", _serverConfig.getConnectorKeyStorePassword());
}
- public void testGetCertType() throws ConfigurationException
+ public void testGetConnectorCertType() throws ConfigurationException
{
// Check default
_serverConfig.initialise();
- assertEquals("SunX509", _serverConfig.getCertType());
+ assertEquals("SunX509", _serverConfig.getConnectorCertType());
// Check value we set
_config.setProperty("connector.ssl.certType", "a");
_serverConfig = new ServerConfiguration(_config);
_serverConfig.initialise();
- assertEquals("a", _serverConfig.getCertType());
+ assertEquals("a", _serverConfig.getConnectorCertType());
}
public void testGetUseBiasedWrites() throws ConfigurationException
@@ -1284,7 +1298,7 @@ public class ServerConfigurationTest extends QpidTestCase
}
/**
- * Test that a non-existant virtualhost file throws a {@link ConfigurationException}.
+ * Test that a non-existent virtualhost file throws a {@link ConfigurationException}.
* <p>
* Test for QPID-2624
*/
@@ -1312,7 +1326,27 @@ public class ServerConfigurationTest extends QpidTestCase
}
}
- /*
+ /**
+ * Tests that element disabledFeatures allows features that would
+ * otherwise be advertised by the broker to be turned off.
+ */
+ public void testDisabledFeatures() throws ConfigurationException
+ {
+ // Check default
+ _serverConfig.initialise();
+ _serverConfig = new ServerConfiguration(_config);
+ assertEquals("Unexpected size", 0, _serverConfig.getDisabledFeatures().size());
+
+ // Check value we set
+ _config.addProperty("disabledFeatures", "qpid.feature1");
+ _config.addProperty("disabledFeatures", "qpid.feature2");
+ _serverConfig = new ServerConfiguration(_config);
+
+ assertEquals("Unexpected size",2, _serverConfig.getDisabledFeatures().size());
+ assertTrue("Unexpected contents", _serverConfig.getDisabledFeatures().contains("qpid.feature1"));
+ }
+
+ /**
* Tests that the old element security.jmx.access (that used to be used
* to define JMX access rights) is rejected.
*/
@@ -1338,7 +1372,7 @@ public class ServerConfigurationTest extends QpidTestCase
}
}
- /*
+ /**
* Tests that the old element security.jmx.principal-database (that used to define the
* principal database used for JMX authentication) is rejected.
*/
@@ -1364,7 +1398,7 @@ public class ServerConfigurationTest extends QpidTestCase
}
}
- /*
+ /**
* Tests that the old element security.principal-databases. ... (that used to define
* principal databases) is rejected.
*/
@@ -1389,7 +1423,7 @@ public class ServerConfigurationTest extends QpidTestCase
}
}
- /*
+ /**
* Tests that the old element housekeeping.expiredMessageCheckPeriod. ... (that was
* replaced by housekeeping.checkPeriod) is rejected.
*/
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/plugins/OsgiSystemPackageUtilTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/plugins/OsgiSystemPackageUtilTest.java
index 4a03445357..f2249c5931 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/plugins/OsgiSystemPackageUtilTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/plugins/OsgiSystemPackageUtilTest.java
@@ -72,11 +72,11 @@ public class OsgiSystemPackageUtilTest extends QpidTestCase
_map.put("org.apache.qpid.xyz", "1.0.0");
_map.put("org.abc", "1.2.3");
- _util = new OsgiSystemPackageUtil(new Version("0.13"), _map);
+ _util = new OsgiSystemPackageUtil(new Version("0.15"), _map);
final String systemPackageString = _util.getFormattedSystemPackageString();
- assertEquals("org.abc; version=1.2.3, org.apache.qpid.xyz; version=0.13.0", systemPackageString);
+ assertEquals("org.abc; version=1.2.3, org.apache.qpid.xyz; version=0.15.0", systemPackageString);
}
public void testWithQpidPackageWithoutQpidReleaseNumberSet() throws Exception
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/ReferenceCountingTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/ReferenceCountingTest.java
index 2d41eb9899..2ffa157ca8 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/ReferenceCountingTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/ReferenceCountingTest.java
@@ -86,11 +86,7 @@ public class ReferenceCountingTest extends QpidTestCase
AMQMessage message = new AMQMessage(storedMessage);
- message = message.takeReference();
-
- // we call routing complete to set up the handle
- // message.routingComplete(_store, _storeContext, new MessageHandleFactory());
-
+ message.incrementReference();
assertEquals(1, _store.getMessageCount());
message.decrementReference();
@@ -146,12 +142,12 @@ public class ReferenceCountingTest extends QpidTestCase
AMQMessage message = new AMQMessage(storedMessage);
- message = message.takeReference();
+ message.incrementReference();
// we call routing complete to set up the handle
// message.routingComplete(_store, _storeContext, new MessageHandleFactory());
assertEquals(1, _store.getMessageCount());
- message = message.takeReference();
+ message.incrementReference();
message.decrementReference();
assertEquals(1, _store.getMessageCount());
}