summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2013-07-01 09:56:29 +0000
committerRobert Godfrey <rgodfrey@apache.org>2013-07-01 09:56:29 +0000
commit2549e2808832606b05383d8383e56d1fafffedee (patch)
tree26136053cf5ad8229351948f596c41bbe0d2afb3
parent94a44efa32a181bfef063523cb592523d48af392 (diff)
downloadqpid-python-2549e2808832606b05383d8383e56d1fafffedee.tar.gz
QPID-4970 : [Java Broker] Configure MessageStores based on VirtualHost object not XML Configuration
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1498345 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java44
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java77
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java14
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostFactory.java82
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java34
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreTest.java52
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreQuotaEventsTest.java19
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java7
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAMessageStoreSmokeTest.java8
-rw-r--r--qpid/java/broker-plugins/jdbc-provider-bone/src/main/java/org/apache/qpid/server/store/jdbc/bonecp/BoneCPConnectionProvider.java35
-rw-r--r--qpid/java/broker-plugins/jdbc-provider-bone/src/main/java/org/apache/qpid/server/store/jdbc/bonecp/BoneCPConnectionProviderFactory.java5
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java22
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/plugin/JDBCConnectionProviderFactory.java3
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/plugin/MessageStoreFactory.java5
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/plugin/VirtualHostFactory.java6
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java21
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java7
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java12
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStoreFactory.java9
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java5
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreConstants.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java8
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java23
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStoreFactory.java9
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/jdbc/DefaultConnectionProviderFactory.java3
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java56
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreFactory.java24
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java13
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java30
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHostFactory.java36
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/store/DurableConfigurationStoreTest.java8
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java17
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java28
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java20
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreQuotaEventsTest.java12
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreTest.java9
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java10
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java21
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/StandardVirtualHostTest.java6
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java27
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java41
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStoreFactory.java78
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitDelayTest.java8
-rw-r--r--qpid/java/systests/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageStoreFactory19
44 files changed, 739 insertions, 236 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
index d036a5d39a..d7c8102c0e 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
@@ -55,6 +55,7 @@ import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.binding.Binding;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.message.EnqueableMessage;
+import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.store.*;
import org.apache.qpid.server.store.ConfigurationRecoveryHandler.BindingRecoveryHandler;
@@ -166,19 +167,18 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo
public void configureConfigStore(String name,
ConfigurationRecoveryHandler recoveryHandler,
- Configuration storeConfiguration) throws Exception
+ VirtualHost virtualHost) throws Exception
{
_stateManager.attainState(State.INITIALISING);
_configRecoveryHandler = recoveryHandler;
- configure(name, storeConfiguration);
+ configure(name, virtualHost);
}
public void configureMessageStore(String name,
MessageStoreRecoveryHandler messageRecoveryHandler,
- TransactionLogRecoveryHandler tlogRecoveryHandler,
- Configuration storeConfiguration) throws Exception
+ TransactionLogRecoveryHandler tlogRecoveryHandler) throws Exception
{
_messageRecoveryHandler = messageRecoveryHandler;
_tlogRecoveryHandler = tlogRecoveryHandler;
@@ -205,18 +205,35 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo
/**
* Called after instantiation in order to configure the message store.
*
+ *
* @param name The name of the virtual host using this store
+ * @param virtualHost
* @return whether a new store environment was created or not (to indicate whether recovery is necessary)
*
* @throws Exception If any error occurs that means the store is unable to configure itself.
*/
- public void configure(String name, Configuration storeConfig) throws Exception
+ public void configure(String name, VirtualHost virtualHost) throws Exception
{
- final String storeLocation = storeConfig.getString(MessageStoreConstants.ENVIRONMENT_PATH_PROPERTY,
- System.getProperty("QPID_WORK") + File.separator + "bdbstore" + File.separator + name);
- _persistentSizeHighThreshold = storeConfig.getLong(MessageStoreConstants.OVERFULL_SIZE_PROPERTY, Long.MAX_VALUE);
- _persistentSizeLowThreshold = storeConfig.getLong(MessageStoreConstants.UNDERFULL_SIZE_PROPERTY, _persistentSizeHighThreshold);
+
+ final String defaultPath = System.getProperty("QPID_WORK") + File.separator + "bdbstore" + File.separator + name;
+
+
+ String storeLocation = (String) virtualHost.getAttribute(VirtualHost.STORE_PATH);
+ if(storeLocation == null)
+ {
+ storeLocation = defaultPath;
+ }
+
+ Object overfullAttr = virtualHost.getAttribute(MessageStoreConstants.OVERFULL_SIZE_ATTRIBUTE);
+ Object underfullAttr = virtualHost.getAttribute(MessageStoreConstants.UNDERFULL_SIZE_ATTRIBUTE);
+
+ _persistentSizeHighThreshold = overfullAttr == null ? -1l :
+ overfullAttr instanceof Number ? ((Number) overfullAttr).longValue() : Long.parseLong(overfullAttr.toString());
+ _persistentSizeLowThreshold = underfullAttr == null ? _persistentSizeHighThreshold :
+ underfullAttr instanceof Number ? ((Number) underfullAttr).longValue() : Long.parseLong(underfullAttr.toString());
+
+
if(_persistentSizeLowThreshold > _persistentSizeHighThreshold || _persistentSizeLowThreshold < 0l)
{
_persistentSizeLowThreshold = _persistentSizeHighThreshold;
@@ -234,7 +251,14 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo
_storeLocation = storeLocation;
- _envConfigMap = getConfigMap(ENVCONFIG_DEFAULTS, storeConfig, "envConfig");
+ _envConfigMap = new HashMap<String, String>();
+ _envConfigMap.putAll(ENVCONFIG_DEFAULTS);
+
+ Object bdbEnvConfigAttr = virtualHost.getAttribute("bdbEnvironmentConfig");
+ if(bdbEnvConfigAttr instanceof Map)
+ {
+ _envConfigMap.putAll((Map)bdbEnvConfigAttr);
+ }
LOGGER.info("Configuring BDB message store");
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java
index ba111e8091..561e4fa660 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java
@@ -39,6 +39,7 @@ import org.apache.qpid.AMQStoreException;
import org.apache.qpid.server.logging.RootMessageLogger;
import org.apache.qpid.server.logging.actors.AbstractActor;
import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.store.HAMessageStore;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.MessageStoreRecoveryHandler;
@@ -121,17 +122,17 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess
private Map<String, String> _repConfig;
@Override
- public void configure(String name, Configuration storeConfig) throws Exception
+ public void configure(String name, VirtualHost virtualHost) throws Exception
{
//Mandatory configuration
- _groupName = getValidatedPropertyFromConfig("highAvailability.groupName", storeConfig);
- _nodeName = getValidatedPropertyFromConfig("highAvailability.nodeName", storeConfig);
- _nodeHostPort = getValidatedPropertyFromConfig("highAvailability.nodeHostPort", storeConfig);
- _helperHostPort = getValidatedPropertyFromConfig("highAvailability.helperHostPort", storeConfig);
+ _groupName = getValidatedStringAttribute(virtualHost, "haGroupName");
+ _nodeName = getValidatedStringAttribute(virtualHost, "haNodeName");
+ _nodeHostPort = getValidatedStringAttribute(virtualHost, "haNodeAddress");
+ _helperHostPort = getValidatedStringAttribute(virtualHost, "haHelperAddress");
_name = name;
//Optional configuration
- String durabilitySetting = storeConfig.getString("highAvailability.durability");
+ String durabilitySetting = getStringAttribute(virtualHost,"haDurability",null);
if (durabilitySetting == null)
{
_durability = DEFAULT_DURABILITY;
@@ -140,9 +141,15 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess
{
_durability = Durability.parse(durabilitySetting);
}
- _designatedPrimary = storeConfig.getBoolean("highAvailability.designatedPrimary", Boolean.FALSE);
- _coalescingSync = storeConfig.getBoolean("highAvailability.coalescingSync", Boolean.TRUE);
- _repConfig = getConfigMap(REPCONFIG_DEFAULTS, storeConfig, "repConfig");
+ _designatedPrimary = getBooleanAttribute(virtualHost, "haDesignatedPrimary", Boolean.FALSE);
+ _coalescingSync = getBooleanAttribute(virtualHost, "haCoalescingSync", Boolean.TRUE);
+
+ _repConfig = new HashMap<String, String>(REPCONFIG_DEFAULTS);
+ Object repConfigAttr = virtualHost.getAttribute("haReplicationConfig");
+ if(repConfigAttr instanceof Map)
+ {
+ _repConfig.putAll((Map)repConfigAttr);
+ }
if (_coalescingSync && _durability.getLocalSync() == SyncPolicy.SYNC)
{
@@ -150,9 +157,54 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess
+ "! Please set highAvailability.coalescingSync to false in store configuration.");
}
- super.configure(name, storeConfig);
+ super.configure(name, virtualHost);
+ }
+
+
+ private String getValidatedStringAttribute(org.apache.qpid.server.model.VirtualHost virtualHost, String attributeName)
+ throws ConfigurationException
+ {
+ Object attrValue = virtualHost.getAttribute(attributeName);
+ if(attrValue != null)
+ {
+ return attrValue.toString();
+ }
+ else
+ {
+ throw new ConfigurationException("BDB HA configuration key not found. Please specify configuration attribute: "
+ + attributeName);
+ }
+ }
+
+ private String getStringAttribute(org.apache.qpid.server.model.VirtualHost virtualHost, String attributeName, String defaultVal)
+ {
+ Object attrValue = virtualHost.getAttribute(attributeName);
+ if(attrValue != null)
+ {
+ return attrValue.toString();
+ }
+ return defaultVal;
+ }
+
+ private boolean getBooleanAttribute(org.apache.qpid.server.model.VirtualHost virtualHost, String attributeName, boolean defaultVal)
+ {
+ Object attrValue = virtualHost.getAttribute(attributeName);
+ if(attrValue != null)
+ {
+ if(attrValue instanceof Boolean)
+ {
+ return ((Boolean) attrValue).booleanValue();
+ }
+ else if(attrValue instanceof String)
+ {
+ return Boolean.parseBoolean((String)attrValue);
+ }
+
+ }
+ return defaultVal;
}
+
@Override
protected void setupStore(File storePath, String name) throws DatabaseException, AMQStoreException
{
@@ -209,10 +261,9 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess
@Override
public void configureMessageStore(String name, MessageStoreRecoveryHandler messageRecoveryHandler,
- TransactionLogRecoveryHandler tlogRecoveryHandler,
- Configuration config) throws Exception
+ TransactionLogRecoveryHandler tlogRecoveryHandler) throws Exception
{
- super.configureMessageStore(name, messageRecoveryHandler, tlogRecoveryHandler, config);
+ super.configureMessageStore(name, messageRecoveryHandler, tlogRecoveryHandler);
final ReplicatedEnvironment replicatedEnvironment = getReplicatedEnvironment();
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java
index 0231573053..8cf1ad8a83 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java
@@ -23,6 +23,7 @@ package org.apache.qpid.server.store.berkeleydb;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.connection.IConnectionRegistry;
import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
+import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.stats.StatisticsGatherer;
import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.Event;
@@ -43,15 +44,16 @@ public class BDBHAVirtualHost extends AbstractVirtualHost
BDBHAVirtualHost(VirtualHostRegistry virtualHostRegistry,
StatisticsGatherer brokerStatisticsGatherer,
org.apache.qpid.server.security.SecurityManager parentSecurityManager,
- VirtualHostConfiguration hostConfig)
+ VirtualHostConfiguration hostConfig,
+ VirtualHost virtualHost)
throws Exception
{
- super(virtualHostRegistry, brokerStatisticsGatherer, parentSecurityManager, hostConfig);
+ super(virtualHostRegistry, brokerStatisticsGatherer, parentSecurityManager, hostConfig, virtualHost);
}
- protected void initialiseStorage(VirtualHostConfiguration hostConfig) throws Exception
+ protected void initialiseStorage(VirtualHostConfiguration hostConfig, VirtualHost virtualHost) throws Exception
{
_messageStore = new BDBHAMessageStore();
@@ -72,12 +74,12 @@ public class BDBHAVirtualHost extends AbstractVirtualHost
_messageStore.configureConfigStore(getName(),
recoveryHandler,
- hostConfig.getStoreConfiguration());
+ virtualHost);
_messageStore.configureMessageStore(getName(),
recoveryHandler,
- recoveryHandler,
- hostConfig.getStoreConfiguration());
+ recoveryHandler
+ );
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostFactory.java
index b01aeafb9a..3b564f33fd 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostFactory.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostFactory.java
@@ -19,12 +19,16 @@ package org.apache.qpid.server.store.berkeleydb;/*
*
*/
+import java.util.HashMap;
import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Map;
+import org.apache.commons.configuration.Configuration;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.model.adapter.VirtualHostAdapter;
import org.apache.qpid.server.plugin.VirtualHostFactory;
import org.apache.qpid.server.stats.StatisticsGatherer;
+import org.apache.qpid.server.store.MessageStoreConstants;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
@@ -43,12 +47,14 @@ public class BDBHAVirtualHostFactory implements VirtualHostFactory
public VirtualHost createVirtualHost(VirtualHostRegistry virtualHostRegistry,
StatisticsGatherer brokerStatisticsGatherer,
org.apache.qpid.server.security.SecurityManager parentSecurityManager,
- VirtualHostConfiguration hostConfig) throws Exception
+ VirtualHostConfiguration hostConfig,
+ org.apache.qpid.server.model.VirtualHost virtualHost) throws Exception
{
return new BDBHAVirtualHost(virtualHostRegistry,
- brokerStatisticsGatherer,
- parentSecurityManager,
- hostConfig);
+ brokerStatisticsGatherer,
+ parentSecurityManager,
+ hostConfig,
+ virtualHost);
}
@Override
@@ -76,31 +82,77 @@ public class BDBHAVirtualHostFactory implements VirtualHostFactory
{
LinkedHashMap<String,Object> convertedMap = new LinkedHashMap<String, Object>();
convertedMap.put("store.environment-path", virtualHostAdapter.getAttribute(org.apache.qpid.server.model.VirtualHost.STORE_PATH));
- convertedMap.put("store.highAvailability.groupName", virtualHostAdapter.getAttribute("haGroupName"));
- convertedMap.put("store.highAvailability.nodeName", virtualHostAdapter.getAttribute("haNodeName"));
- convertedMap.put("store.highAvailability.nodeHostPort", virtualHostAdapter.getAttribute("haNodeAddress"));
- convertedMap.put("store.highAvailability.helperHostPort", virtualHostAdapter.getAttribute("haHelperAddress"));
- final Object haDurability = virtualHostAdapter.getAttribute("haDurability");
+ return convertedMap;
+ }
+
+ public Map<String, Object> convertVirtualHostConfiguration(Configuration configuration)
+ {
+
+ LinkedHashMap<String,Object> convertedMap = new LinkedHashMap<String, Object>();
+
+ Configuration storeConfiguration = configuration.subset("store");
+
+ convertedMap.put(org.apache.qpid.server.model.VirtualHost.STORE_PATH, storeConfiguration.getString(MessageStoreConstants.ENVIRONMENT_PATH_PROPERTY));
+ convertedMap.put(MessageStoreConstants.OVERFULL_SIZE_ATTRIBUTE, storeConfiguration.getString(MessageStoreConstants.OVERFULL_SIZE_PROPERTY));
+ convertedMap.put(MessageStoreConstants.UNDERFULL_SIZE_ATTRIBUTE, storeConfiguration.getString(MessageStoreConstants.UNDERFULL_SIZE_PROPERTY));
+ convertedMap.put("haGroupName", configuration.getString("store.highAvailability.groupName"));
+ convertedMap.put("haNodeName", configuration.getString("store.highAvailability.nodeName"));
+ convertedMap.put("haNodeAddress", configuration.getString("store.highAvailability.nodeHostPort"));
+ convertedMap.put("haHelperAddress", configuration.getString("store.highAvailability.helperHostPort"));
+
+ final Object haDurability = configuration.getString("store.highAvailability.durability");
if(haDurability !=null)
{
- convertedMap.put("store.highAvailability.durability", haDurability);
+ convertedMap.put("haDurability", haDurability);
}
- final Object designatedPrimary = virtualHostAdapter.getAttribute("haDesignatedPrimary");
+ final Object designatedPrimary = configuration.getString("store.highAvailability.designatedPrimary");
if(designatedPrimary!=null)
{
- convertedMap.put("store.highAvailability.designatedPrimary", designatedPrimary);
+ convertedMap.put("haDesignatedPrimary", designatedPrimary);
}
- final Object coalescingSync = virtualHostAdapter.getAttribute("haCoalescingSync");
+ final Object coalescingSync = configuration.getString("store.highAvailability.coalescingSync");
if(coalescingSync!=null)
{
- convertedMap.put("store.highAvailability.coalescingSync", coalescingSync);
+ convertedMap.put("haCoalescingSync", coalescingSync);
+ }
+
+
+ Map<String, String> attributes = getEnvironmentMap(storeConfiguration, "envConfig");
+
+ if(!attributes.isEmpty())
+ {
+ convertedMap.put("bdbEnvironmentConfig",attributes);
}
- // TODO REP_CONFIG values
+ attributes = getEnvironmentMap(storeConfiguration, "repConfig");
+
+ if(!attributes.isEmpty())
+ {
+ convertedMap.put("haReplicationConfig",attributes);
+ }
return convertedMap;
+
+ }
+
+ private Map<String, String> getEnvironmentMap(Configuration storeConfiguration, String configName)
+ {
+ final List<Object> argumentNames = storeConfiguration.getList(configName +".name");
+ final List<Object> argumentValues = storeConfiguration.getList(configName +".value");
+ final int initialSize = argumentNames.size();
+
+ final Map<String,String> attributes = new HashMap<String,String>(initialSize);
+
+ for (int i = 0; i < argumentNames.size(); i++)
+ {
+ final String argName = argumentNames.get(i).toString();
+ final String argValue = argumentValues.get(i).toString();
+
+ attributes.put(argName, argValue);
+ }
+ return attributes;
}
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java
index ef886d3d6d..4d224ab86e 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java
@@ -20,6 +20,11 @@
*/
package org.apache.qpid.server.store.berkeleydb;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.configuration.Configuration;
import org.apache.qpid.server.plugin.MessageStoreFactory;
import org.apache.qpid.server.store.MessageStore;
@@ -38,4 +43,33 @@ public class BDBMessageStoreFactory implements MessageStoreFactory
return new BDBMessageStore();
}
+ @Override
+ public Map<String, Object> convertStoreConfiguration(Configuration storeConfiguration)
+ {
+ final List<Object> argumentNames = storeConfiguration.getList("envConfig.name");
+ final List<Object> argumentValues = storeConfiguration.getList("envConfig.value");
+ final int initialSize = argumentNames.size();
+
+ final Map<String,String> attributes = new HashMap<String,String>(initialSize);
+
+ for (int i = 0; i < argumentNames.size(); i++)
+ {
+ final String argName = argumentNames.get(i).toString();
+ final String argValue = argumentValues.get(i).toString();
+
+ attributes.put(argName, argValue);
+ }
+
+ if(initialSize != 0)
+ {
+ return Collections.singletonMap("bdbEnvironmentConfig", (Object)attributes);
+ }
+ else
+ {
+ return Collections.emptyMap();
+ }
+
+
+ }
+
}
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreTest.java
index 047b102817..0bbd399b9f 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreTest.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreTest.java
@@ -23,6 +23,8 @@ package org.apache.qpid.server.store.berkeleydb;
import java.io.File;
import java.net.InetAddress;
+import java.util.HashMap;
+import java.util.Map;
import org.apache.commons.configuration.XMLConfiguration;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.util.BrokerTestHelper;
@@ -35,6 +37,10 @@ import com.sleepycat.je.EnvironmentConfig;
import com.sleepycat.je.rep.ReplicatedEnvironment;
import com.sleepycat.je.rep.ReplicationConfig;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
public class BDBHAMessageStoreTest extends QpidTestCase
{
@@ -48,6 +54,7 @@ public class BDBHAMessageStoreTest extends QpidTestCase
private String _host;
private XMLConfiguration _configXml;
private VirtualHost _virtualHost;
+ private org.apache.qpid.server.model.VirtualHost _modelVhost;
public void setUp() throws Exception
{
@@ -60,6 +67,8 @@ public class BDBHAMessageStoreTest extends QpidTestCase
FileUtils.delete(new File(_workDir), true);
_configXml = new XMLConfiguration();
+ _modelVhost = mock(org.apache.qpid.server.model.VirtualHost.class);
+
BrokerTestHelper.setUp();
}
@@ -87,7 +96,8 @@ public class BDBHAMessageStoreTest extends QpidTestCase
addVirtualHostConfiguration();
String vhostName = "test" + _masterPort;
VirtualHostConfiguration configuration = new VirtualHostConfiguration(vhostName, _configXml.subset("virtualhosts.virtualhost." + vhostName), BrokerTestHelper.createBrokerMock());
- _virtualHost = BrokerTestHelper.createVirtualHost(configuration);
+
+ _virtualHost = BrokerTestHelper.createVirtualHost(configuration,null,_modelVhost);
BDBHAMessageStore store = (BDBHAMessageStore) _virtualHost.getMessageStore();
// test whether JVM system settings were applied
@@ -116,27 +126,25 @@ public class BDBHAMessageStoreTest extends QpidTestCase
_configXml.addProperty("virtualhosts.virtualhost.name", vhostName);
_configXml.addProperty(vhostPrefix + ".type", BDBHAVirtualHostFactory.TYPE);
- _configXml.addProperty(vhostPrefix + ".store.class", BDBHAMessageStore.class.getName());
- _configXml.addProperty(vhostPrefix + ".store.environment-path", _workDir + File.separator
- + port);
- _configXml.addProperty(vhostPrefix + ".store.highAvailability.groupName", _groupName);
- _configXml.addProperty(vhostPrefix + ".store.highAvailability.nodeName", nodeName);
- _configXml.addProperty(vhostPrefix + ".store.highAvailability.nodeHostPort",
- getNodeHostPortForNodeAt(port));
- _configXml.addProperty(vhostPrefix + ".store.highAvailability.helperHostPort",
- getHelperHostPort());
-
- _configXml.addProperty(vhostPrefix + ".store.envConfig(-1).name", EnvironmentConfig.CLEANER_THREADS);
- _configXml.addProperty(vhostPrefix + ".store.envConfig.value", TEST_NUMBER_OF_THREADS);
-
- _configXml.addProperty(vhostPrefix + ".store.envConfig(-1).name", EnvironmentConfig.LOG_FILE_MAX);
- _configXml.addProperty(vhostPrefix + ".store.envConfig.value", TEST_LOG_FILE_MAX);
-
- _configXml.addProperty(vhostPrefix + ".store.repConfig(-1).name", ReplicationConfig.ELECTIONS_PRIMARY_RETRIES);
- _configXml.addProperty(vhostPrefix + ".store.repConfig.value", TEST_ELECTION_RETRIES);
-
- _configXml.addProperty(vhostPrefix + ".store.repConfig(-1).name", ReplicationConfig.ENV_CONSISTENCY_TIMEOUT);
- _configXml.addProperty(vhostPrefix + ".store.repConfig.value", TEST_ENV_CONSISTENCY_TIMEOUT);
+
+ when(_modelVhost.getAttribute(eq(_modelVhost.STORE_PATH))).thenReturn(_workDir + File.separator
+ + port);
+ when(_modelVhost.getAttribute(eq("haGroupName"))).thenReturn(_groupName);
+ when(_modelVhost.getAttribute(eq("haNodeName"))).thenReturn(nodeName);
+ when(_modelVhost.getAttribute(eq("haNodeAddress"))).thenReturn(getNodeHostPortForNodeAt(port));
+ when(_modelVhost.getAttribute(eq("haHelperAddress"))).thenReturn(getHelperHostPort());
+
+ Map<String,String> bdbEnvConfig = new HashMap<String,String>();
+ bdbEnvConfig.put(EnvironmentConfig.CLEANER_THREADS, TEST_NUMBER_OF_THREADS);
+ bdbEnvConfig.put(EnvironmentConfig.LOG_FILE_MAX, TEST_LOG_FILE_MAX);
+
+ when(_modelVhost.getAttribute(eq("bdbEnvironmentConfig"))).thenReturn(bdbEnvConfig);
+
+ Map<String,String> repConfig = new HashMap<String,String>();
+ repConfig.put(ReplicationConfig.ELECTIONS_PRIMARY_RETRIES, TEST_ELECTION_RETRIES);
+ repConfig.put(ReplicationConfig.ENV_CONSISTENCY_TIMEOUT, TEST_ENV_CONSISTENCY_TIMEOUT);
+ when(_modelVhost.getAttribute(eq("haReplicationConfig"))).thenReturn(repConfig);
+
}
private String getNodeNameForNodeAt(final int bdbPort)
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreQuotaEventsTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreQuotaEventsTest.java
index fe48e29d0b..8ba0d41e03 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreQuotaEventsTest.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreQuotaEventsTest.java
@@ -20,11 +20,19 @@
*/
package org.apache.qpid.server.store.berkeleydb;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
import org.apache.commons.configuration.XMLConfiguration;
import org.apache.log4j.Logger;
+import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.MessageStoreConstants;
import org.apache.qpid.server.store.MessageStoreQuotaEventsTestBase;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.when;
+
public class BDBMessageStoreQuotaEventsTest extends MessageStoreQuotaEventsTestBase
{
private static final Logger _logger = Logger.getLogger(BDBMessageStoreQuotaEventsTest.class);
@@ -54,14 +62,15 @@ public class BDBMessageStoreQuotaEventsTest extends MessageStoreQuotaEventsTestB
}
@Override
- protected void applyStoreSpecificConfiguration(XMLConfiguration config)
+ protected void applyStoreSpecificConfiguration(VirtualHost virtualHost)
{
_logger.debug("Applying store specific config. overfull-sze=" + OVERFULL_SIZE + ", underfull-size=" + UNDERFULL_SIZE);
- config.addProperty("envConfig(-1).name", "je.log.fileMax");
- config.addProperty("envConfig.value", MAX_BDB_LOG_SIZE);
- config.addProperty("overfull-size", OVERFULL_SIZE);
- config.addProperty("underfull-size", UNDERFULL_SIZE);
+ Map<String,String> envMap = Collections.singletonMap("je.log.fileMax", MAX_BDB_LOG_SIZE);
+ when(virtualHost.getAttribute(eq("bdbEnvironmentConfig"))).thenReturn(envMap);
+ when(virtualHost.getAttribute(eq(MessageStoreConstants.OVERFULL_SIZE_ATTRIBUTE))).thenReturn(OVERFULL_SIZE);
+ when(virtualHost.getAttribute(eq(MessageStoreConstants.UNDERFULL_SIZE_ATTRIBUTE))).thenReturn(UNDERFULL_SIZE);
+
}
@Override
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
index 11b30e66ad..e77119b140 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
@@ -39,6 +39,7 @@ import org.apache.qpid.server.message.MessageMetaData_0_10;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.UUIDGenerator;
+import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.store.MessageMetaDataType;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StorableMessageMetaData;
@@ -54,6 +55,10 @@ import org.apache.qpid.transport.MessageDeliveryPriority;
import org.apache.qpid.transport.MessageProperties;
import org.apache.qpid.transport.MessageTransfer;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
/**
* Subclass of MessageStoreTest which runs the standard tests from the superclass against
* the BDB Store as well as additional tests specific to the BDB store-implementation.
@@ -226,7 +231,7 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto
messageStore.close();
AbstractBDBMessageStore newStore = new BDBMessageStore();
- newStore.configure("", getConfig().subset("store"));
+ newStore.configure("", getVirtualHostModel());
newStore.startWithNoRecover();
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAMessageStoreSmokeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAMessageStoreSmokeTest.java
index eaa3c3eba4..5ad49462ac 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAMessageStoreSmokeTest.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAMessageStoreSmokeTest.java
@@ -21,18 +21,20 @@ package org.apache.qpid.server.store.berkeleydb;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.XMLConfiguration;
+import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.test.utils.QpidTestCase;
+import static org.mockito.Mockito.mock;
+
public class HAMessageStoreSmokeTest extends QpidTestCase
{
private final BDBHAMessageStore _store = new BDBHAMessageStore();
- private final XMLConfiguration _config = new XMLConfiguration();
public void testMissingHAConfigThrowsException() throws Exception
{
try
{
- _store.configure("test", _config);
+ _store.configure("test", mock(VirtualHost.class));
fail("Expected an exception to be thrown");
}
catch (ConfigurationException ce)
@@ -40,4 +42,4 @@ public class HAMessageStoreSmokeTest extends QpidTestCase
assertTrue(ce.getMessage().contains("BDB HA configuration key not found"));
}
}
-} \ No newline at end of file
+}
diff --git a/qpid/java/broker-plugins/jdbc-provider-bone/src/main/java/org/apache/qpid/server/store/jdbc/bonecp/BoneCPConnectionProvider.java b/qpid/java/broker-plugins/jdbc-provider-bone/src/main/java/org/apache/qpid/server/store/jdbc/bonecp/BoneCPConnectionProvider.java
index 7545b84611..1cde6f130d 100644
--- a/qpid/java/broker-plugins/jdbc-provider-bone/src/main/java/org/apache/qpid/server/store/jdbc/bonecp/BoneCPConnectionProvider.java
+++ b/qpid/java/broker-plugins/jdbc-provider-bone/src/main/java/org/apache/qpid/server/store/jdbc/bonecp/BoneCPConnectionProvider.java
@@ -25,6 +25,7 @@ import com.jolbox.bonecp.BoneCPConfig;
import java.sql.Connection;
import java.sql.SQLException;
import org.apache.commons.configuration.Configuration;
+import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.store.jdbc.ConnectionProvider;
public class BoneCPConnectionProvider implements ConnectionProvider
@@ -34,17 +35,43 @@ public class BoneCPConnectionProvider implements ConnectionProvider
public static final int DEFAULT_PARTITION_COUNT = 4;
private final BoneCP _connectionPool;
- public BoneCPConnectionProvider(String connectionUrl, Configuration storeConfiguration) throws SQLException
+ public BoneCPConnectionProvider(String connectionUrl, VirtualHost virtualHost) throws SQLException
{
BoneCPConfig config = new BoneCPConfig();
config.setJdbcUrl(connectionUrl);
- config.setMinConnectionsPerPartition(storeConfiguration.getInteger("minConnectionsPerPartition", DEFAULT_MIN_CONNECTIONS_PER_PARTITION));
- config.setMaxConnectionsPerPartition(storeConfiguration.getInteger("maxConnectionsPerPartition", DEFAULT_MAX_CONNECTIONS_PER_PARTITION));
- config.setPartitionCount(storeConfiguration.getInteger("partitionCount",DEFAULT_PARTITION_COUNT));
+
+ config.setMinConnectionsPerPartition(getIntegerAttribute(virtualHost, "minConnectionsPerPartition", DEFAULT_MIN_CONNECTIONS_PER_PARTITION));
+ config.setMaxConnectionsPerPartition(getIntegerAttribute(virtualHost, "maxConnectionsPerPartition", DEFAULT_MAX_CONNECTIONS_PER_PARTITION));
+ config.setPartitionCount(getIntegerAttribute(virtualHost, "partitionCount",DEFAULT_PARTITION_COUNT));
_connectionPool = new BoneCP(config);
}
+ private int getIntegerAttribute(VirtualHost virtualHost, String attributeName, int defaultVal)
+ {
+ Object attrValue = virtualHost.getAttribute(attributeName);
+ if(attrValue != null)
+ {
+ if(attrValue instanceof Number)
+ {
+ return ((Number) attrValue).intValue();
+ }
+ else if(attrValue instanceof String)
+ {
+ try
+ {
+ return Integer.parseInt((String)attrValue);
+ }
+ catch (NumberFormatException e)
+ {
+ return defaultVal;
+ }
+ }
+
+ }
+ return defaultVal;
+ }
+
@Override
public Connection getConnection() throws SQLException
{
diff --git a/qpid/java/broker-plugins/jdbc-provider-bone/src/main/java/org/apache/qpid/server/store/jdbc/bonecp/BoneCPConnectionProviderFactory.java b/qpid/java/broker-plugins/jdbc-provider-bone/src/main/java/org/apache/qpid/server/store/jdbc/bonecp/BoneCPConnectionProviderFactory.java
index 71c59c7772..73876eceb4 100644
--- a/qpid/java/broker-plugins/jdbc-provider-bone/src/main/java/org/apache/qpid/server/store/jdbc/bonecp/BoneCPConnectionProviderFactory.java
+++ b/qpid/java/broker-plugins/jdbc-provider-bone/src/main/java/org/apache/qpid/server/store/jdbc/bonecp/BoneCPConnectionProviderFactory.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.store.jdbc.bonecp;
import java.sql.SQLException;
import org.apache.commons.configuration.Configuration;
+import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.plugin.JDBCConnectionProviderFactory;
import org.apache.qpid.server.store.jdbc.ConnectionProvider;
@@ -34,9 +35,9 @@ public class BoneCPConnectionProviderFactory implements JDBCConnectionProviderFa
}
@Override
- public ConnectionProvider getConnectionProvider(String connectionUrl, Configuration storeConfiguration)
+ public ConnectionProvider getConnectionProvider(String connectionUrl, VirtualHost virtualHost)
throws SQLException
{
- return new BoneCPConnectionProvider(connectionUrl, storeConfiguration);
+ return new BoneCPConnectionProvider(connectionUrl, virtualHost);
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java
index 383ff2f3f6..68177dac62 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java
@@ -144,7 +144,7 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual
{
validateAttributes(type);
}
- }
+ }/*
else
{
if (type != null)
@@ -152,7 +152,7 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual
invalidAttributes = true;
}
- }
+ }*/
if (invalidAttributes)
{
throw new IllegalConfigurationException("Please specify either the 'configPath' attribute or 'type' attributes");
@@ -1109,9 +1109,10 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual
else
{
_virtualHost = factory.createVirtualHost(_broker.getVirtualHostRegistry(),
- _brokerStatisticsGatherer,
- _broker.getSecurityManager(),
- configuration);
+ _brokerStatisticsGatherer,
+ _broker.getSecurityManager(),
+ configuration,
+ this);
}
}
catch (Exception e)
@@ -1172,6 +1173,17 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual
throw new IllegalConfigurationException("Configuration file '" + configurationFile + "' does not exist");
}
configuration = new VirtualHostConfiguration(virtualHostName, new File(configurationFile) , _broker);
+ String type = configuration.getType();
+ changeAttribute(TYPE,null,type);
+ VirtualHostFactory factory = VirtualHostFactory.FACTORIES.get(type);
+ if(factory != null)
+ {
+ for(Map.Entry<String,Object> entry : factory.convertVirtualHostConfiguration(configuration.getConfig()).entrySet())
+ {
+ changeAttribute(entry.getKey(), getAttribute(entry.getKey()), entry.getValue());
+ }
+ }
+
}
return configuration;
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/plugin/JDBCConnectionProviderFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/plugin/JDBCConnectionProviderFactory.java
index e3b7f03978..a5f4ea063b 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/plugin/JDBCConnectionProviderFactory.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/plugin/JDBCConnectionProviderFactory.java
@@ -26,13 +26,14 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import org.apache.commons.configuration.Configuration;
+import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.store.jdbc.ConnectionProvider;
public interface JDBCConnectionProviderFactory
{
String getType();
- ConnectionProvider getConnectionProvider(String connectionUrl, Configuration storeConfiguration)
+ ConnectionProvider getConnectionProvider(String connectionUrl, VirtualHost virtualHost)
throws SQLException;
static final class TYPES
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/plugin/MessageStoreFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/plugin/MessageStoreFactory.java
index aff06af4ce..9297f34f94 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/plugin/MessageStoreFactory.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/plugin/MessageStoreFactory.java
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.server.plugin;
+import java.util.Map;
+import org.apache.commons.configuration.Configuration;
import org.apache.qpid.server.store.MessageStore;
public interface MessageStoreFactory
@@ -27,4 +29,7 @@ public interface MessageStoreFactory
String getType();
MessageStore createMessageStore();
+
+ public Map<String, Object> convertStoreConfiguration(Configuration configuration);
+
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/plugin/VirtualHostFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/plugin/VirtualHostFactory.java
index f952e0410c..2a3b65f829 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/plugin/VirtualHostFactory.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/plugin/VirtualHostFactory.java
@@ -25,6 +25,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import org.apache.commons.configuration.Configuration;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.model.adapter.VirtualHostAdapter;
import org.apache.qpid.server.security.SecurityManager;
@@ -39,12 +40,15 @@ public interface VirtualHostFactory
VirtualHost createVirtualHost(VirtualHostRegistry virtualHostRegistry,
StatisticsGatherer brokerStatisticsGatherer,
SecurityManager parentSecurityManager,
- VirtualHostConfiguration hostConfig) throws Exception;
+ VirtualHostConfiguration hostConfig,
+ org.apache.qpid.server.model.VirtualHost virtualHost) throws Exception;
void validateAttributes(Map<String, Object> attributes);
Map<String, Object> createVirtualHostConfiguration(VirtualHostAdapter virtualHostAdapter);
+ Map<String,Object> convertVirtualHostConfiguration(Configuration configuration);
+
static final class TYPES
{
private TYPES()
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
index b7f5035de0..758945d6a1 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
@@ -29,7 +29,6 @@ import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
-import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
@@ -41,7 +40,6 @@ import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
-import org.apache.commons.configuration.Configuration;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQStoreException;
@@ -50,6 +48,7 @@ import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.binding.Binding;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.message.EnqueableMessage;
+import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.store.ConfigurationRecoveryHandler.BindingRecoveryHandler;
import org.apache.qpid.server.store.ConfigurationRecoveryHandler.ExchangeRecoveryHandler;
@@ -170,21 +169,20 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
@Override
public void configureConfigStore(String name,
- ConfigurationRecoveryHandler configRecoveryHandler,
- Configuration storeConfiguration) throws Exception
+ ConfigurationRecoveryHandler configRecoveryHandler,
+ VirtualHost virtualHost) throws Exception
{
_stateManager.attainState(State.INITIALISING);
_configRecoveryHandler = configRecoveryHandler;
- commonConfiguration(name, storeConfiguration);
+ commonConfiguration(name, virtualHost);
}
@Override
public void configureMessageStore(String name,
- MessageStoreRecoveryHandler recoveryHandler,
- TransactionLogRecoveryHandler tlogRecoveryHandler,
- Configuration storeConfiguration) throws Exception
+ MessageStoreRecoveryHandler recoveryHandler,
+ TransactionLogRecoveryHandler tlogRecoveryHandler) throws Exception
{
_tlogRecoveryHandler = tlogRecoveryHandler;
_messageRecoveryHandler = recoveryHandler;
@@ -206,15 +204,16 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
_stateManager.attainState(State.ACTIVE);
}
- private void commonConfiguration(String name, Configuration storeConfiguration)
+ private void commonConfiguration(String name, VirtualHost virtualHost)
throws ClassNotFoundException, SQLException
{
- implementationSpecificConfiguration(name, storeConfiguration);
+ implementationSpecificConfiguration(name, virtualHost);
createOrOpenDatabase();
}
- protected abstract void implementationSpecificConfiguration(String name, Configuration storeConfiguration) throws ClassNotFoundException, SQLException;
+ protected abstract void implementationSpecificConfiguration(String name,
+ VirtualHost virtualHost) throws ClassNotFoundException, SQLException;
abstract protected Logger getLogger();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java
index 27a40963f6..7092655e95 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java
@@ -26,6 +26,7 @@ import org.apache.qpid.AMQStoreException;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.binding.Binding;
import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.queue.AMQQueue;
public interface DurableConfigurationStore
@@ -40,14 +41,16 @@ public interface DurableConfigurationStore
* Called after instantiation in order to configure the message store. A particular implementation can define
* whatever parameters it wants.
*
+ *
+ *
* @param name The name to be used by this store
* @param recoveryHandler Handler to be called as the store recovers on start up
- * @param config The apache commons configuration object.
+ * @param virtualHost
* @throws Exception If any error occurs that means the store is unable to configure itself.
*/
void configureConfigStore(String name,
ConfigurationRecoveryHandler recoveryHandler,
- Configuration config) throws Exception;
+ VirtualHost virtualHost) throws Exception;
/**
* Makes the specified exchange persistent.
*
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
index 3f1d1b9530..9b2496f262 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
@@ -20,12 +20,12 @@
*/
package org.apache.qpid.server.store;
-import org.apache.commons.configuration.Configuration;
import org.apache.qpid.AMQStoreException;
import org.apache.qpid.server.message.EnqueableMessage;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.qpid.server.model.VirtualHost;
/** A simple message store that stores the messages in a thread-safe structure in memory. */
public class MemoryMessageStore extends NullMessageStore
@@ -82,13 +82,17 @@ public class MemoryMessageStore extends NullMessageStore
}
@Override
- public void configureConfigStore(String name, ConfigurationRecoveryHandler recoveryHandler, Configuration config) throws Exception
+ public void configureConfigStore(String name,
+ ConfigurationRecoveryHandler recoveryHandler,
+ VirtualHost virtualHost) throws Exception
{
_stateManager.attainState(State.INITIALISING);
}
@Override
- public void configureMessageStore(String name, MessageStoreRecoveryHandler recoveryHandler, TransactionLogRecoveryHandler tlogRecoveryHandler, Configuration config) throws Exception
+ public void configureMessageStore(String name,
+ MessageStoreRecoveryHandler recoveryHandler,
+ TransactionLogRecoveryHandler tlogRecoveryHandler) throws Exception
{
_stateManager.attainState(State.INITIALISED);
}
@@ -97,7 +101,7 @@ public class MemoryMessageStore extends NullMessageStore
public void activate() throws Exception
{
_stateManager.attainState(State.ACTIVATING);
-
+
_stateManager.attainState(State.ACTIVE);
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStoreFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStoreFactory.java
index dceba4af31..476b2f127d 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStoreFactory.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStoreFactory.java
@@ -20,6 +20,9 @@
*/
package org.apache.qpid.server.store;
+import java.util.Collections;
+import java.util.Map;
+import org.apache.commons.configuration.Configuration;
import org.apache.qpid.server.plugin.MessageStoreFactory;
public class MemoryMessageStoreFactory implements MessageStoreFactory
@@ -37,4 +40,10 @@ public class MemoryMessageStoreFactory implements MessageStoreFactory
return new MemoryMessageStore();
}
+ @Override
+ public Map<String, Object> convertStoreConfiguration(Configuration configuration)
+ {
+ return Collections.emptyMap();
+ }
+
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java
index bbdfaf4959..5fc6bad368 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java
@@ -32,16 +32,15 @@ public interface MessageStore
* Called after instantiation in order to configure the message store. A particular implementation can define
* whatever parameters it wants.
*
+ *
* @param name The name to be used by this store
* @param messageRecoveryHandler Handler to be called as the store recovers on start up
* @param tlogRecoveryHandler
- * @param config The apache commons configuration object.
* @throws Exception If any error occurs that means the store is unable to configure itself.
*/
void configureMessageStore(String name,
MessageStoreRecoveryHandler messageRecoveryHandler,
- TransactionLogRecoveryHandler tlogRecoveryHandler,
- Configuration config) throws Exception;
+ TransactionLogRecoveryHandler tlogRecoveryHandler) throws Exception;
void activate() throws Exception;
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreConstants.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreConstants.java
index 728da23f28..93b669e6e4 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreConstants.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreConstants.java
@@ -25,5 +25,7 @@ public class MessageStoreConstants
public static final String ENVIRONMENT_PATH_PROPERTY = "environment-path";
public static final String OVERFULL_SIZE_PROPERTY = "overfull-size";
public static final String UNDERFULL_SIZE_PROPERTY = "underfull-size";
+ public static final String OVERFULL_SIZE_ATTRIBUTE = "storeOverfullSize";
+ public static final String UNDERFULL_SIZE_ATTRIBUTE = "storeUnderfullSize";
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java
index f0936a221c..3b57bbfa55 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java
@@ -19,11 +19,11 @@
*/
package org.apache.qpid.server.store;
-import org.apache.commons.configuration.Configuration;
import org.apache.qpid.AMQStoreException;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.binding.Binding;
import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.queue.AMQQueue;
public abstract class NullMessageStore implements MessageStore, DurableConfigurationStore
@@ -31,7 +31,7 @@ public abstract class NullMessageStore implements MessageStore, DurableConfigura
@Override
public void configureConfigStore(String name,
ConfigurationRecoveryHandler recoveryHandler,
- Configuration config) throws Exception
+ VirtualHost virtualHost) throws Exception
{
}
@@ -77,8 +77,8 @@ public abstract class NullMessageStore implements MessageStore, DurableConfigura
@Override
public void configureMessageStore(String name,
- MessageStoreRecoveryHandler recoveryHandler,
- TransactionLogRecoveryHandler tlogRecoveryHandler, Configuration config) throws Exception
+ MessageStoreRecoveryHandler recoveryHandler,
+ TransactionLogRecoveryHandler tlogRecoveryHandler) throws Exception
{
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
index 0d4231a10d..ac310d02c9 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
@@ -32,8 +32,8 @@ import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
-import org.apache.commons.configuration.Configuration;
import org.apache.log4j.Logger;
+import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.store.AbstractJDBCMessageStore;
import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.Event;
@@ -123,15 +123,20 @@ public class DerbyMessageStore extends AbstractJDBCMessageStore implements Messa
}
@Override
- protected void implementationSpecificConfiguration(String name, Configuration storeConfiguration)
+ protected void implementationSpecificConfiguration(String name,
+ VirtualHost virtualHost)
throws ClassNotFoundException
{
//Update to pick up QPID_WORK and use that as the default location not just derbyDB
_driverClass = (Class<Driver>) Class.forName(SQL_DRIVER_NAME);
- final String databasePath = storeConfiguration.getString(MessageStoreConstants.ENVIRONMENT_PATH_PROPERTY, System.getProperty("QPID_WORK")
- + File.separator + "derbyDB");
+ String defaultPath = System.getProperty("QPID_WORK") + File.separator + "derbyDB";
+ String databasePath = (String) virtualHost.getAttribute(VirtualHost.STORE_PATH);
+ if(databasePath == null)
+ {
+ databasePath = defaultPath;
+ }
if(!MEMORY_STORE_LOCATION.equals(databasePath))
{
@@ -148,8 +153,14 @@ public class DerbyMessageStore extends AbstractJDBCMessageStore implements Messa
_storeLocation = databasePath;
- _persistentSizeHighThreshold = storeConfiguration.getLong(MessageStoreConstants.OVERFULL_SIZE_PROPERTY, -1l);
- _persistentSizeLowThreshold = storeConfiguration.getLong(MessageStoreConstants.UNDERFULL_SIZE_PROPERTY, _persistentSizeHighThreshold);
+ Object overfullAttr = virtualHost.getAttribute(MessageStoreConstants.OVERFULL_SIZE_ATTRIBUTE);
+ Object underfullAttr = virtualHost.getAttribute(MessageStoreConstants.UNDERFULL_SIZE_ATTRIBUTE);
+
+ _persistentSizeHighThreshold = overfullAttr == null ? -1l :
+ overfullAttr instanceof Number ? ((Number) overfullAttr).longValue() : Long.parseLong(overfullAttr.toString());
+ _persistentSizeLowThreshold = underfullAttr == null ? _persistentSizeHighThreshold :
+ underfullAttr instanceof Number ? ((Number) underfullAttr).longValue() : Long.parseLong(underfullAttr.toString());
+
if(_persistentSizeLowThreshold > _persistentSizeHighThreshold || _persistentSizeLowThreshold < 0l)
{
_persistentSizeLowThreshold = _persistentSizeHighThreshold;
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStoreFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStoreFactory.java
index 81e5bb9ff9..0f53d66435 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStoreFactory.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStoreFactory.java
@@ -20,6 +20,9 @@
*/
package org.apache.qpid.server.store.derby;
+import java.util.Collections;
+import java.util.Map;
+import org.apache.commons.configuration.Configuration;
import org.apache.qpid.server.plugin.MessageStoreFactory;
import org.apache.qpid.server.store.MessageStore;
@@ -38,4 +41,10 @@ public class DerbyMessageStoreFactory implements MessageStoreFactory
return new DerbyMessageStore();
}
+ @Override
+ public Map<String, Object> convertStoreConfiguration(Configuration configuration)
+ {
+ return Collections.emptyMap();
+ }
+
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/jdbc/DefaultConnectionProviderFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/jdbc/DefaultConnectionProviderFactory.java
index 0f074cc95b..a2d3644590 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/jdbc/DefaultConnectionProviderFactory.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/jdbc/DefaultConnectionProviderFactory.java
@@ -21,6 +21,7 @@
package org.apache.qpid.server.store.jdbc;
import org.apache.commons.configuration.Configuration;
+import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.plugin.JDBCConnectionProviderFactory;
public class DefaultConnectionProviderFactory implements JDBCConnectionProviderFactory
@@ -34,7 +35,7 @@ public class DefaultConnectionProviderFactory implements JDBCConnectionProviderF
@Override
public ConnectionProvider getConnectionProvider(String connectionUrl,
- Configuration storeConfiguration)
+ VirtualHost virtualHost)
{
return new DefaultConnectionProvider(connectionUrl);
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java
index 79093fe2e2..951ea28c20 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java
@@ -29,13 +29,12 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
-import org.apache.commons.configuration.Configuration;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQStoreException;
+import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.plugin.JDBCConnectionProviderFactory;
import org.apache.qpid.server.store.AbstractJDBCMessageStore;
import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.MessageStoreConstants;
import org.apache.qpid.server.store.StoreFuture;
import org.apache.qpid.server.store.Transaction;
@@ -275,13 +274,15 @@ public class JDBCMessageStore extends AbstractJDBCMessageStore implements Messag
}
- protected void implementationSpecificConfiguration(String name, Configuration storeConfiguration)
+ protected void implementationSpecificConfiguration(String name,
+ VirtualHost virtualHost)
throws ClassNotFoundException, SQLException
{
- String connectionURL = storeConfiguration.getString("connectionUrl",
- storeConfiguration.getString(MessageStoreConstants.ENVIRONMENT_PATH_PROPERTY));
+ String connectionURL = virtualHost.getAttribute("connectionURL") == null
+ ? String.valueOf(virtualHost.getAttribute(VirtualHost.STORE_PATH))
+ : String.valueOf(virtualHost.getAttribute("connectionURL"));
JDBCDetails details = null;
@@ -301,8 +302,9 @@ public class JDBCMessageStore extends AbstractJDBCMessageStore implements Messag
}
- Configuration poolConfig = storeConfiguration.subset("pool");
- String connectionPoolType = poolConfig.getString("type", "DEFAULT");
+ Object poolAttribute = virtualHost.getAttribute("connectionPool");
+ String connectionPoolType = poolAttribute == null ? "DEFAULT" : String.valueOf(poolAttribute);
+
JDBCConnectionProviderFactory connectionProviderFactory =
JDBCConnectionProviderFactory.FACTORIES.get(connectionPoolType);
if(connectionProviderFactory == null)
@@ -311,14 +313,44 @@ public class JDBCMessageStore extends AbstractJDBCMessageStore implements Messag
connectionProviderFactory = new DefaultConnectionProviderFactory();
}
- _connectionProvider = connectionProviderFactory.getConnectionProvider(connectionURL, poolConfig);
+ _connectionProvider = connectionProviderFactory.getConnectionProvider(connectionURL, virtualHost);
+
+ _blobType = getStringAttribute(virtualHost, "jdbcBlobType",details.getBlobType());
+ _varBinaryType = getStringAttribute(virtualHost, "jdbcVarbinaryType",details.getVarBinaryType());
+ _useBytesMethodsForBlob = getBooleanAttribute(virtualHost, "jdbcBytesForBlob",details.isUseBytesMethodsForBlob());
+ _bigIntType = getStringAttribute(virtualHost, "jdbcBigIntType", details.getBigintType());
+ }
+
+
+ private String getStringAttribute(VirtualHost virtualHost, String attributeName, String defaultVal)
+ {
+ Object attrValue = virtualHost.getAttribute(attributeName);
+ if(attrValue != null)
+ {
+ return attrValue.toString();
+ }
+ return defaultVal;
+ }
+
+ private boolean getBooleanAttribute(VirtualHost virtualHost, String attributeName, boolean defaultVal)
+ {
+ Object attrValue = virtualHost.getAttribute(attributeName);
+ if(attrValue != null)
+ {
+ if(attrValue instanceof Boolean)
+ {
+ return ((Boolean) attrValue).booleanValue();
+ }
+ else if(attrValue instanceof String)
+ {
+ return Boolean.parseBoolean((String)attrValue);
+ }
- _blobType = storeConfiguration.getString("sqlBlobType",details.getBlobType());
- _varBinaryType = storeConfiguration.getString("sqlVarbinaryType",details.getVarBinaryType());
- _useBytesMethodsForBlob = storeConfiguration.getBoolean("useBytesForBlob",details.isUseBytesMethodsForBlob());
- _bigIntType = storeConfiguration.getString("sqlBigIntType", details.getBigintType());
+ }
+ return defaultVal;
}
+
protected void storedSizeChange(int contentSize)
{
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreFactory.java
index 1446ad34e9..99ec4c7d32 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreFactory.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreFactory.java
@@ -20,6 +20,9 @@
*/
package org.apache.qpid.server.store.jdbc;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.commons.configuration.Configuration;
import org.apache.qpid.server.plugin.MessageStoreFactory;
import org.apache.qpid.server.store.MessageStore;
@@ -38,4 +41,25 @@ public class JDBCMessageStoreFactory implements MessageStoreFactory
return new JDBCMessageStore();
}
+ @Override
+ public Map<String, Object> convertStoreConfiguration(Configuration storeConfiguration)
+ {
+ Map<String,Object> convertedMap = new HashMap<String,Object>();
+ convertedMap.put("jdbcBlobType", storeConfiguration.getString("sqlBlobType"));
+ convertedMap.put("jdbcVarbinaryType", storeConfiguration.getString("sqlVarbinaryType"));
+ if(storeConfiguration.containsKey("useBytesForBlob"))
+ {
+ convertedMap.put("jdbcUseBytesForBlob", storeConfiguration.getBoolean("useBytesForBlob"));
+ }
+ convertedMap.put("jdbcBigIntType", storeConfiguration.getString("sqlBigIntType"));
+ convertedMap.put("connectionPool", storeConfiguration.getString("pool.type"));
+ convertedMap.put("minConnectionsPerPartition", storeConfiguration.getInteger("pool.minConnectionsPerPartition",
+ null));
+ convertedMap.put("maxConnectionsPerPartition", storeConfiguration.getInteger("pool.maxConnectionsPerPartition",
+ null));
+ convertedMap.put("partitionCount", storeConfiguration.getInteger("pool.partitionCount", null));
+
+ return convertedMap;
+ }
+
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
index 6116d46e41..a704ca112b 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
@@ -45,7 +45,6 @@ import org.apache.qpid.server.exchange.ExchangeFactory;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.messages.VirtualHostMessages;
-import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
@@ -57,12 +56,8 @@ import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.stats.StatisticsCounter;
import org.apache.qpid.server.stats.StatisticsGatherer;
-import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.Event;
import org.apache.qpid.server.store.EventListener;
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.MessageStoreCreator;
-import org.apache.qpid.server.store.OperationalLoggingListener;
import org.apache.qpid.server.txn.DtxRegistry;
public abstract class AbstractVirtualHost implements VirtualHost, IConnectionRegistry.RegistryChangeListener, EventListener
@@ -107,7 +102,8 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg
public AbstractVirtualHost(VirtualHostRegistry virtualHostRegistry,
StatisticsGatherer brokerStatisticsGatherer,
SecurityManager parentSecurityManager,
- VirtualHostConfiguration hostConfig) throws Exception
+ VirtualHostConfiguration hostConfig,
+ org.apache.qpid.server.model.VirtualHost virtualHost) throws Exception
{
if (hostConfig == null)
{
@@ -144,13 +140,14 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg
initialiseStatistics();
- initialiseStorage(hostConfig);
+ initialiseStorage(hostConfig, virtualHost);
getMessageStore().addEventListener(this, Event.PERSISTENT_MESSAGE_SIZE_OVERFULL);
getMessageStore().addEventListener(this, Event.PERSISTENT_MESSAGE_SIZE_UNDERFULL);
}
- abstract protected void initialiseStorage(VirtualHostConfiguration hostConfig) throws Exception;
+ abstract protected void initialiseStorage(VirtualHostConfiguration hostConfig,
+ org.apache.qpid.server.model.VirtualHost virtualHost) throws Exception;
public IConnectionRegistry getConnectionRegistry()
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java
index 05a33e7d99..82be0c01e1 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java
@@ -19,12 +19,11 @@ package org.apache.qpid.server.virtualhost;/*
*
*/
-import org.apache.commons.configuration.Configuration;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
+import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.stats.StatisticsGatherer;
import org.apache.qpid.server.store.DurableConfigurationStore;
-import org.apache.qpid.server.store.Event;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.MessageStoreCreator;
import org.apache.qpid.server.store.OperationalLoggingListener;
@@ -36,18 +35,19 @@ public class StandardVirtualHost extends AbstractVirtualHost
private DurableConfigurationStore _durableConfigurationStore;
StandardVirtualHost(VirtualHostRegistry virtualHostRegistry,
- StatisticsGatherer brokerStatisticsGatherer,
- org.apache.qpid.server.security.SecurityManager parentSecurityManager,
- VirtualHostConfiguration hostConfig) throws Exception
+ StatisticsGatherer brokerStatisticsGatherer,
+ org.apache.qpid.server.security.SecurityManager parentSecurityManager,
+ VirtualHostConfiguration hostConfig, VirtualHost virtualHost) throws Exception
{
- super(virtualHostRegistry, brokerStatisticsGatherer, parentSecurityManager, hostConfig);
+ super(virtualHostRegistry, brokerStatisticsGatherer, parentSecurityManager, hostConfig, virtualHost);
}
- private MessageStore initialiseMessageStore(VirtualHostConfiguration hostConfig) throws Exception
+ private MessageStore initialiseMessageStore(VirtualHostConfiguration hostConfig, VirtualHost virtualHost) throws Exception
{
- String storeType = hostConfig.getConfig().getString("store.type");
+ final Object storeTypeAttr = virtualHost.getAttribute(VirtualHost.STORE_TYPE);
+ String storeType = storeTypeAttr == null ? null : String.valueOf(storeTypeAttr);
MessageStore messageStore = null;
if (storeType == null)
{
@@ -74,7 +74,7 @@ public class StandardVirtualHost extends AbstractVirtualHost
return messageStore;
}
- private DurableConfigurationStore initialiseConfigurationStore(VirtualHostConfiguration hostConfig) throws Exception
+ private DurableConfigurationStore initialiseConfigurationStore(VirtualHost virtualHost) throws Exception
{
DurableConfigurationStore configurationStore;
if(getMessageStore() instanceof DurableConfigurationStore)
@@ -90,19 +90,17 @@ public class StandardVirtualHost extends AbstractVirtualHost
}
- protected void initialiseStorage(VirtualHostConfiguration hostConfig) throws Exception
+ protected void initialiseStorage(VirtualHostConfiguration hostConfig, VirtualHost virtualHost) throws Exception
{
- _messageStore = initialiseMessageStore(hostConfig);
+ _messageStore = initialiseMessageStore(hostConfig, virtualHost);
- _durableConfigurationStore = initialiseConfigurationStore(hostConfig);
+ _durableConfigurationStore = initialiseConfigurationStore(virtualHost);
VirtualHostConfigRecoveryHandler recoveryHandler = new VirtualHostConfigRecoveryHandler(this);
- final Configuration storeConfiguration = hostConfig.getStoreConfiguration();
+ _durableConfigurationStore.configureConfigStore(getName(), recoveryHandler, virtualHost);
- _durableConfigurationStore.configureConfigStore(getName(), recoveryHandler, storeConfiguration);
-
- _messageStore.configureMessageStore(getName(), recoveryHandler, recoveryHandler, storeConfiguration);
+ _messageStore.configureMessageStore(getName(), recoveryHandler, recoveryHandler);
initialiseModel(hostConfig);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHostFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHostFactory.java
index 3a604dbd90..a0f22aa34c 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHostFactory.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHostFactory.java
@@ -19,13 +19,21 @@ package org.apache.qpid.server.virtualhost;/*
*
*/
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Map;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.ConfigurationException;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.model.adapter.VirtualHostAdapter;
+import org.apache.qpid.server.plugin.MessageStoreFactory;
import org.apache.qpid.server.plugin.VirtualHostFactory;
import org.apache.qpid.server.stats.StatisticsGatherer;
import org.apache.qpid.server.store.MemoryMessageStore;
+import org.apache.qpid.server.store.MessageStoreConstants;
import org.apache.qpid.server.store.MessageStoreCreator;
public class StandardVirtualHostFactory implements VirtualHostFactory
@@ -43,9 +51,10 @@ public class StandardVirtualHostFactory implements VirtualHostFactory
public VirtualHost createVirtualHost(VirtualHostRegistry virtualHostRegistry,
StatisticsGatherer brokerStatisticsGatherer,
org.apache.qpid.server.security.SecurityManager parentSecurityManager,
- VirtualHostConfiguration hostConfig) throws Exception
+ VirtualHostConfiguration hostConfig,
+ org.apache.qpid.server.model.VirtualHost virtualHost) throws Exception
{
- return new StandardVirtualHost(virtualHostRegistry, brokerStatisticsGatherer, parentSecurityManager, hostConfig);
+ return new StandardVirtualHost(virtualHostRegistry, brokerStatisticsGatherer, parentSecurityManager, hostConfig, virtualHost);
}
@@ -94,13 +103,26 @@ public class StandardVirtualHostFactory implements VirtualHostFactory
convertedMap.put("store.type", virtualHostAdapter.getAttribute(org.apache.qpid.server.model.VirtualHost.STORE_TYPE));
convertedMap.put("store.environment-path", virtualHostAdapter.getAttribute(org.apache.qpid.server.model.VirtualHost.STORE_PATH));
- // TODO - this should all be inverted to populate vhost from xml and then pass model object to the store
+ return convertedMap;
+ }
+
+ @Override
+ public Map<String, Object> convertVirtualHostConfiguration(Configuration configuration)
+ {
+ Map<String,Object> convertedMap = new LinkedHashMap<String, Object>();
+ Configuration storeConfiguration = configuration.subset("store");
+ convertedMap.put(org.apache.qpid.server.model.VirtualHost.STORE_TYPE, storeConfiguration.getString("type"));
+ convertedMap.put(org.apache.qpid.server.model.VirtualHost.STORE_PATH, storeConfiguration.getString(MessageStoreConstants.ENVIRONMENT_PATH_PROPERTY));
- convertedMap.put("store.pool.type",virtualHostAdapter.getAttribute("connectionPool"));
- convertedMap.put("store.pool.minConnectionsPerPartition",virtualHostAdapter.getAttribute("minConnectionsPerPartition"));
- convertedMap.put("store.pool.maxConnectionsPerPartition",virtualHostAdapter.getAttribute("maxConnectionsPerPartition"));
- convertedMap.put("store.pool.partitionCount",virtualHostAdapter.getAttribute("partitionCount"));
+ convertedMap.put(MessageStoreConstants.OVERFULL_SIZE_ATTRIBUTE, storeConfiguration.getString(MessageStoreConstants.OVERFULL_SIZE_PROPERTY));
+ convertedMap.put(MessageStoreConstants.UNDERFULL_SIZE_ATTRIBUTE, storeConfiguration.getString(MessageStoreConstants.UNDERFULL_SIZE_PROPERTY));
+
+ for(MessageStoreFactory mf : new MessageStoreCreator().getFactories())
+ {
+ convertedMap.putAll(mf.convertStoreConfiguration(storeConfiguration));
+ }
return convertedMap;
+
}
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/DurableConfigurationStoreTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/DurableConfigurationStoreTest.java
index 8a7d5d85fc..57e1fcd15c 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/DurableConfigurationStoreTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/DurableConfigurationStoreTest.java
@@ -49,6 +49,7 @@ import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.actors.TestLogActor;
import org.apache.qpid.server.message.EnqueableMessage;
import org.apache.qpid.server.model.UUIDGenerator;
+import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.MockStoredMessage;
import org.apache.qpid.server.store.ConfigurationRecoveryHandler.BindingRecoveryHandler;
@@ -67,6 +68,7 @@ public class DurableConfigurationStoreTest extends QpidTestCase
private String _storeName;
private MessageStore _messageStore;
private Configuration _configuration;
+ private VirtualHost _virtualHost;
private ConfigurationRecoveryHandler _recoveryHandler;
private QueueRecoveryHandler _queueRecoveryHandler;
@@ -107,6 +109,7 @@ public class DurableConfigurationStoreTest extends QpidTestCase
_messageStoreRecoveryHandler = mock(MessageStoreRecoveryHandler.class);
_queueEntryRecoveryHandler = mock(TransactionLogRecoveryHandler.QueueEntryRecoveryHandler.class);
_dtxRecordRecoveryHandler = mock(TransactionLogRecoveryHandler.DtxRecordRecoveryHandler.class);
+ _virtualHost = mock(VirtualHost.class);
when(_messageStoreRecoveryHandler.begin()).thenReturn(_storedMessageRecoveryHandler);
when(_recoveryHandler.begin(isA(MessageStore.class))).thenReturn(_exchangeRecoveryHandler);
@@ -118,6 +121,7 @@ public class DurableConfigurationStoreTest extends QpidTestCase
when(_exchange.getId()).thenReturn(_exchangeId);
when(_configuration.getString(eq(MessageStoreConstants.ENVIRONMENT_PATH_PROPERTY), anyString())).thenReturn(
_storePath);
+ when(_virtualHost.getAttribute(eq(VirtualHost.STORE_PATH))).thenReturn(_storePath);
_bindingArgs = new FieldTable();
AMQShortString argKey = AMQPFilterTypes.JMS_SELECTOR.getValue();
@@ -314,8 +318,8 @@ public class DurableConfigurationStoreTest extends QpidTestCase
_messageStore = createMessageStore();
_configStore = createConfigStore();
- _configStore.configureConfigStore(_storeName, _recoveryHandler, _configuration);
- _messageStore.configureMessageStore(_storeName, _messageStoreRecoveryHandler, _logRecoveryHandler, _configuration);
+ _configStore.configureConfigStore(_storeName, _recoveryHandler, _virtualHost);
+ _messageStore.configureMessageStore(_storeName, _messageStoreRecoveryHandler, _logRecoveryHandler);
_messageStore.activate();
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java
index 8743c4111b..f57195b2d7 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java
@@ -26,7 +26,6 @@ import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
-import org.apache.commons.configuration.XMLConfiguration;
import org.apache.log4j.Logger;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
@@ -37,9 +36,14 @@ import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
import org.apache.qpid.server.message.EnqueableMessage;
import org.apache.qpid.server.message.MessageMetaData;
+import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.test.utils.QpidTestCase;
import org.apache.qpid.util.FileUtils;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
public abstract class MessageStoreQuotaEventsTestBase extends QpidTestCase implements EventListener, TransactionLogResource
{
private static final Logger _logger = Logger.getLogger(MessageStoreQuotaEventsTestBase.class);
@@ -54,7 +58,7 @@ public abstract class MessageStoreQuotaEventsTestBase extends QpidTestCase imple
protected abstract MessageStore createStore() throws Exception;
- protected abstract void applyStoreSpecificConfiguration(XMLConfiguration config);
+ protected abstract void applyStoreSpecificConfiguration(VirtualHost virtualHost);
protected abstract int getNumberOfMessagesToFillStore();
@@ -66,12 +70,13 @@ public abstract class MessageStoreQuotaEventsTestBase extends QpidTestCase imple
_storeLocation = new File(new File(TMP_FOLDER), getTestName());
FileUtils.delete(_storeLocation, true);
- XMLConfiguration config = new XMLConfiguration();
- config.addProperty("environment-path", _storeLocation.getAbsolutePath());
- applyStoreSpecificConfiguration(config);
+
+ VirtualHost vhost = mock(VirtualHost.class);
+ when(vhost.getAttribute(eq(VirtualHost.STORE_PATH))).thenReturn(_storeLocation.getAbsolutePath());
+ applyStoreSpecificConfiguration(vhost);
_store = createStore();
- ((DurableConfigurationStore)_store).configureConfigStore("test", null, config);
+ ((DurableConfigurationStore)_store).configureConfigStore("test", null, vhost);
_transactionResource = UUID.randomUUID();
_events = new ArrayList<Event>();
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
index fb255e89f9..5eea002365 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
@@ -62,6 +62,10 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
/**
* This tests the MessageStores by using the available interfaces.
*
@@ -98,26 +102,42 @@ public class MessageStoreTest extends QpidTestCase
private PropertiesConfiguration _config;
private VirtualHost _virtualHost;
+ private org.apache.qpid.server.model.VirtualHost _virtualHostModel;
private Broker _broker;
+ private String _storePath;
public void setUp() throws Exception
{
super.setUp();
BrokerTestHelper.setUp();
- String storePath = System.getProperty("QPID_WORK") + File.separator + getName();
+ _storePath = System.getProperty("QPID_WORK") + File.separator + getName();
_config = new PropertiesConfiguration();
_config.addProperty("store.class", getTestProfileMessageStoreClassName());
- _config.addProperty("store.environment-path", storePath);
+ _config.addProperty("store.environment-path", _storePath);
+ _virtualHostModel = mock(org.apache.qpid.server.model.VirtualHost.class);
+ when(_virtualHostModel.getAttribute(eq(org.apache.qpid.server.model.VirtualHost.STORE_PATH))).thenReturn(_storePath);
+
+
- cleanup(new File(storePath));
+ cleanup(new File(_storePath));
_broker = BrokerTestHelper.createBrokerMock();
reloadVirtualHost();
}
+ protected String getStorePath()
+ {
+ return _storePath;
+ }
+
+ protected org.apache.qpid.server.model.VirtualHost getVirtualHostModel()
+ {
+ return _virtualHostModel;
+ }
+
@Override
public void tearDown() throws Exception
{
@@ -164,7 +184,7 @@ public class MessageStoreTest extends QpidTestCase
try
{
- _virtualHost = BrokerTestHelper.createVirtualHost(new VirtualHostConfiguration(getClass().getName(), _config, _broker));
+ _virtualHost = BrokerTestHelper.createVirtualHost(new VirtualHostConfiguration(getClass().getName(), _config, _broker),null,getVirtualHostModel());
}
catch (Exception e)
{
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java
index 88d5852a17..a73057ebc1 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java
@@ -25,8 +25,7 @@ import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
-import org.apache.commons.configuration.Configuration;
-import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.store.ConfigurationRecoveryHandler.BindingRecoveryHandler;
import org.apache.qpid.server.store.ConfigurationRecoveryHandler.ExchangeRecoveryHandler;
import org.apache.qpid.server.store.ConfigurationRecoveryHandler.QueueRecoveryHandler;
@@ -44,9 +43,9 @@ public abstract class MessageStoreTestCase extends QpidTestCase
private TransactionLogRecoveryHandler _logRecoveryHandler;
private TransactionLogRecoveryHandler.QueueEntryRecoveryHandler _queueEntryRecoveryHandler;
private TransactionLogRecoveryHandler.DtxRecordRecoveryHandler _dtxRecordRecoveryHandler;
+ private VirtualHost _virtualHost;
private MessageStore _store;
- private Configuration _storeConfiguration;
public void setUp() throws Exception
{
@@ -61,6 +60,7 @@ public abstract class MessageStoreTestCase extends QpidTestCase
_messageStoreRecoveryHandler = mock(MessageStoreRecoveryHandler.class);
_queueEntryRecoveryHandler = mock(TransactionLogRecoveryHandler.QueueEntryRecoveryHandler.class);
_dtxRecordRecoveryHandler = mock(TransactionLogRecoveryHandler.DtxRecordRecoveryHandler.class);
+ _virtualHost = mock(VirtualHost.class);
when(_messageStoreRecoveryHandler.begin()).thenReturn(_storedMessageRecoveryHandler);
when(_recoveryHandler.begin(isA(MessageStore.class))).thenReturn(_exchangeRecoveryHandler);
@@ -69,15 +69,15 @@ public abstract class MessageStoreTestCase extends QpidTestCase
when(_logRecoveryHandler.begin(any(MessageStore.class))).thenReturn(_queueEntryRecoveryHandler);
when(_queueEntryRecoveryHandler.completeQueueEntryRecovery()).thenReturn(_dtxRecordRecoveryHandler);
- _storeConfiguration = new PropertiesConfiguration();
- setUpStoreConfiguration(_storeConfiguration);
+ setUpStoreConfiguration(_virtualHost);
_store = createMessageStore();
- ((DurableConfigurationStore)_store).configureConfigStore(getTestName(), _recoveryHandler, _storeConfiguration);
- _store.configureMessageStore(getTestName(), _messageStoreRecoveryHandler, _logRecoveryHandler, _storeConfiguration);
+ ((DurableConfigurationStore)_store).configureConfigStore(getTestName(), _recoveryHandler, _virtualHost);
+
+ _store.configureMessageStore(getTestName(), _messageStoreRecoveryHandler, _logRecoveryHandler);
}
- protected abstract void setUpStoreConfiguration(Configuration storeConfiguration) throws Exception;
+ protected abstract void setUpStoreConfiguration(VirtualHost virtualHost) throws Exception;
protected abstract MessageStore createMessageStore();
@@ -86,8 +86,4 @@ public abstract class MessageStoreTestCase extends QpidTestCase
return _store;
}
- public Configuration getStoreConfiguration()
- {
- return _storeConfiguration;
- }
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreQuotaEventsTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreQuotaEventsTest.java
index 5d316fca43..479675dac1 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreQuotaEventsTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreQuotaEventsTest.java
@@ -20,11 +20,15 @@
*/
package org.apache.qpid.server.store.derby;
-import org.apache.commons.configuration.XMLConfiguration;
import org.apache.log4j.Logger;
+import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.MessageStoreConstants;
import org.apache.qpid.server.store.MessageStoreQuotaEventsTestBase;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.when;
+
public class DerbyMessageStoreQuotaEventsTest extends MessageStoreQuotaEventsTestBase
{
private static final Logger _logger = Logger.getLogger(DerbyMessageStoreQuotaEventsTest.class);
@@ -46,12 +50,12 @@ public class DerbyMessageStoreQuotaEventsTest extends MessageStoreQuotaEventsTes
}
@Override
- protected void applyStoreSpecificConfiguration(XMLConfiguration config)
+ protected void applyStoreSpecificConfiguration(VirtualHost vhost)
{
_logger.debug("Applying store specific config. overfull-sze=" + OVERFULL_SIZE + ", underfull-size=" + UNDERFULL_SIZE);
- config.addProperty("overfull-size", OVERFULL_SIZE);
- config.addProperty("underfull-size", UNDERFULL_SIZE);
+ when(vhost.getAttribute(eq(MessageStoreConstants.OVERFULL_SIZE_ATTRIBUTE))).thenReturn(OVERFULL_SIZE);
+ when(vhost.getAttribute(eq(MessageStoreConstants.UNDERFULL_SIZE_ATTRIBUTE))).thenReturn(UNDERFULL_SIZE);
}
@Override
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreTest.java
index 1747588bf1..859fad629b 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreTest.java
@@ -22,11 +22,14 @@ package org.apache.qpid.server.store.derby;
import java.io.File;
-import org.apache.commons.configuration.Configuration;
+import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.MessageStoreTestCase;
import org.apache.qpid.util.FileUtils;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.when;
+
public class DerbyMessageStoreTest extends MessageStoreTestCase
{
private String _storeLocation;
@@ -57,10 +60,10 @@ public class DerbyMessageStoreTest extends MessageStoreTestCase
}
@Override
- protected void setUpStoreConfiguration(Configuration storeConfiguration) throws Exception
+ protected void setUpStoreConfiguration(VirtualHost virtualHost) throws Exception
{
_storeLocation = TMP_FOLDER + File.separator + getTestName();
- storeConfiguration.setProperty("environment-path", _storeLocation);
+ when(virtualHost.getAttribute(eq(VirtualHost.STORE_PATH))).thenReturn(_storeLocation);
deleteStoreIfExists();
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java
index bb118eaaf7..a8e0460cea 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java
@@ -28,11 +28,14 @@ import java.sql.SQLException;
import java.util.HashSet;
import java.util.Set;
-import org.apache.commons.configuration.Configuration;
+import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.MessageStoreTestCase;
import org.apache.qpid.server.store.derby.DerbyMessageStore;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.when;
+
public class JDBCMessageStoreTest extends MessageStoreTestCase
{
private String _connectionURL;
@@ -61,10 +64,11 @@ public class JDBCMessageStoreTest extends MessageStoreTestCase
}
@Override
- protected void setUpStoreConfiguration(Configuration storeConfiguration) throws Exception
+ protected void setUpStoreConfiguration(VirtualHost virtualHost) throws Exception
{
_connectionURL = "jdbc:derby:memory:/" + getTestName() + ";create=true";
- storeConfiguration.addProperty("connectionUrl", _connectionURL);
+
+ when(virtualHost.getAttribute(eq("connectionURL"))).thenReturn(_connectionURL);
}
@Override
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java
index 1c8939d117..7811d04997 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java
@@ -58,7 +58,6 @@ import org.apache.qpid.server.store.TestableMemoryMessageStore;
import org.apache.qpid.server.virtualhost.StandardVirtualHostFactory;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.plugin.VirtualHostFactory;
-import org.apache.qpid.server.virtualhost.VirtualHostFactoryRegistry;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
public class BrokerTestHelper
@@ -98,6 +97,12 @@ public class BrokerTestHelper
public static VirtualHost createVirtualHost(VirtualHostConfiguration virtualHostConfiguration, VirtualHostRegistry virtualHostRegistry)
throws Exception
{
+ return createVirtualHost(virtualHostConfiguration, virtualHostRegistry, mock(org.apache.qpid.server.model.VirtualHost.class));
+ }
+
+ public static VirtualHost createVirtualHost(VirtualHostConfiguration virtualHostConfiguration, VirtualHostRegistry virtualHostRegistry, org.apache.qpid.server.model.VirtualHost modelVHost)
+ throws Exception
+ {
StatisticsGatherer statisticsGatherer = mock(StatisticsGatherer.class);
final VirtualHostFactory factory =
virtualHostConfiguration == null ? new StandardVirtualHostFactory()
@@ -105,18 +110,18 @@ public class BrokerTestHelper
VirtualHost host = factory.createVirtualHost(virtualHostRegistry,
statisticsGatherer,
new SecurityManager(mock(Broker.class), false),
- virtualHostConfiguration);
- virtualHostRegistry.registerVirtualHost(host);
+ virtualHostConfiguration,
+ modelVHost);
+ if(virtualHostRegistry != null)
+ {
+ virtualHostRegistry.registerVirtualHost(host);
+ }
return host;
}
public static VirtualHost createVirtualHost(VirtualHostConfiguration virtualHostConfiguration) throws Exception
{
- final VirtualHostFactory factory =
- virtualHostConfiguration == null ? new StandardVirtualHostFactory()
- : VirtualHostFactory.FACTORIES.get(virtualHostConfiguration.getType());
-
- return factory.createVirtualHost(null, mock(StatisticsGatherer.class), new SecurityManager(mock(Broker.class), false), virtualHostConfiguration);
+ return createVirtualHost(virtualHostConfiguration, null);
}
public static VirtualHost createVirtualHost(String name, VirtualHostRegistry virtualHostRegistry) throws Exception
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/StandardVirtualHostTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/StandardVirtualHostTest.java
index 1243d9f7dd..ae09e8d7e7 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/StandardVirtualHostTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/StandardVirtualHostTest.java
@@ -264,7 +264,8 @@ public class StandardVirtualHostTest extends QpidTestCase
_virtualHostRegistry = broker.getVirtualHostRegistry();
VirtualHostConfiguration configuration = new VirtualHostConfiguration(vhostName, config, broker);
- VirtualHost host = new StandardVirtualHostFactory().createVirtualHost(_virtualHostRegistry, mock(StatisticsGatherer.class), new SecurityManager(mock(Broker.class), false), configuration);
+ VirtualHost host = new StandardVirtualHostFactory().createVirtualHost(_virtualHostRegistry, mock(StatisticsGatherer.class), new SecurityManager(mock(Broker.class), false), configuration,
+ mock(org.apache.qpid.server.model.VirtualHost.class));
_virtualHostRegistry.registerVirtualHost(host);
return host;
@@ -364,7 +365,8 @@ public class StandardVirtualHostTest extends QpidTestCase
Configuration config = new PropertiesConfiguration();
config.setProperty("store.type", MemoryMessageStore.TYPE);
VirtualHostConfiguration configuration = new VirtualHostConfiguration(virtualHostName, config, broker);
- VirtualHost host = new StandardVirtualHostFactory().createVirtualHost(_virtualHostRegistry, mock(StatisticsGatherer.class), new SecurityManager(mock(Broker.class), false), configuration);
+ VirtualHost host = new StandardVirtualHostFactory().createVirtualHost(_virtualHostRegistry, mock(StatisticsGatherer.class), new SecurityManager(mock(Broker.class), false), configuration,
+ mock(org.apache.qpid.server.model.VirtualHost.class));
_virtualHostRegistry.registerVirtualHost(host);
return host;
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java
index 07965cfa95..69efb7e310 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java
@@ -23,10 +23,10 @@ package org.apache.qpid.server.store;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
-import org.apache.commons.configuration.Configuration;
import org.apache.qpid.AMQStoreException;
import org.apache.qpid.server.message.EnqueableMessage;
import org.apache.qpid.server.message.MessageContentSource;
+import org.apache.qpid.server.model.VirtualHost;
public class QuotaMessageStore extends NullMessageStore
{
@@ -47,12 +47,27 @@ public class QuotaMessageStore extends NullMessageStore
}
@Override
- public void configureConfigStore(String name, ConfigurationRecoveryHandler recoveryHandler, Configuration config)
+ public void configureConfigStore(String name,
+ ConfigurationRecoveryHandler recoveryHandler,
+ VirtualHost virtualHost)
throws Exception
{
- _persistentSizeHighThreshold = config.getLong(MessageStoreConstants.OVERFULL_SIZE_PROPERTY, Long.MAX_VALUE);
- _persistentSizeLowThreshold = config.getLong(MessageStoreConstants.UNDERFULL_SIZE_PROPERTY,
- _persistentSizeHighThreshold);
+ Object overfullAttr = virtualHost.getAttribute(MessageStoreConstants.OVERFULL_SIZE_ATTRIBUTE);
+ _persistentSizeHighThreshold = overfullAttr == null
+ ? Long.MAX_VALUE
+ : overfullAttr instanceof Number
+ ? ((Number)overfullAttr).longValue()
+ : Long.parseLong(overfullAttr.toString());
+
+ Object underfullAttr = virtualHost.getAttribute(MessageStoreConstants.UNDERFULL_SIZE_ATTRIBUTE);
+
+ _persistentSizeLowThreshold = overfullAttr == null
+ ? _persistentSizeHighThreshold
+ : underfullAttr instanceof Number
+ ? ((Number)underfullAttr).longValue()
+ : Long.parseLong(underfullAttr.toString());
+
+
if (_persistentSizeLowThreshold > _persistentSizeHighThreshold || _persistentSizeLowThreshold < 0l)
{
_persistentSizeLowThreshold = _persistentSizeHighThreshold;
@@ -62,7 +77,7 @@ public class QuotaMessageStore extends NullMessageStore
@Override
public void configureMessageStore(String name, MessageStoreRecoveryHandler recoveryHandler,
- TransactionLogRecoveryHandler tlogRecoveryHandler, Configuration config) throws Exception
+ TransactionLogRecoveryHandler tlogRecoveryHandler) throws Exception
{
_stateManager.attainState(State.INITIALISED);
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
index ed76c40717..76250e126a 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
@@ -20,7 +20,8 @@
*/
package org.apache.qpid.server.store;
-import org.apache.commons.configuration.Configuration;
+import java.util.Collections;
+import java.util.Map;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQStoreException;
@@ -29,11 +30,11 @@ import org.apache.qpid.server.binding.Binding;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.message.EnqueableMessage;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.queue.AMQQueue;
import java.nio.ByteBuffer;
import java.util.HashMap;
-import java.util.Iterator;
public class SlowMessageStore implements MessageStore, DurableConfigurationStore
{
@@ -51,19 +52,22 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore
// ***** MessageStore Interface.
public void configureConfigStore(String name,
- ConfigurationRecoveryHandler recoveryHandler,
- Configuration config) throws Exception
+ ConfigurationRecoveryHandler recoveryHandler,
+ VirtualHost virtualHost) throws Exception
{
_logger.info("Starting SlowMessageStore on Virtualhost:" + name);
- Configuration delays = config.subset(DELAYS);
+ Object delaysAttr = virtualHost.getAttribute("slowMessageStoreDelays");
+
+ Map delays = (delaysAttr instanceof Map) ? (Map) delaysAttr : Collections.emptyMap();
configureDelays(delays);
- String messageStoreClass = config.getString("realStore");
+ final Object realStoreAttr = virtualHost.getAttribute("realStore");
+ String messageStoreClass = realStoreAttr == null ? null : realStoreAttr.toString();
if (delays.containsKey(DEFAULT_DELAY))
{
- _defaultDelay = delays.getLong(DEFAULT_DELAY);
+ _defaultDelay = Long.parseLong(String.valueOf(delays.get(DEFAULT_DELAY)));
}
if (messageStoreClass != null)
@@ -83,25 +87,23 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore
_durableConfigurationStore = (DurableConfigurationStore)o;
}
}
- _durableConfigurationStore.configureConfigStore(name, recoveryHandler, config);
+ _durableConfigurationStore.configureConfigStore(name, recoveryHandler, virtualHost);
}
- private void configureDelays(Configuration config)
+ private void configureDelays(Map<Object, Object> config)
{
- @SuppressWarnings("unchecked")
- Iterator<String> delays = config.getKeys();
- while (delays.hasNext())
+ for(Map.Entry<Object, Object> entry : config.entrySet())
{
- String key = (String) delays.next();
- if (key.endsWith(PRE))
+ String key = String.valueOf(entry.getKey());
+ if (key.startsWith(PRE))
{
- _preDelays.put(key.substring(0, key.length() - PRE.length() - 1), config.getLong(key));
+ _preDelays.put(key.substring(PRE.length()), Long.parseLong(String.valueOf(entry.getValue())));
}
- else if (key.endsWith(POST))
+ else if (key.startsWith(POST))
{
- _postDelays.put(key.substring(0, key.length() - POST.length() - 1), config.getLong(key));
+ _postDelays.put(key.substring(POST.length()), Long.parseLong(String.valueOf(entry.getValue())));
}
}
}
@@ -156,10 +158,9 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore
public void configureMessageStore(String name,
MessageStoreRecoveryHandler messageRecoveryHandler,
- TransactionLogRecoveryHandler tlogRecoveryHandler,
- Configuration config) throws Exception
+ TransactionLogRecoveryHandler tlogRecoveryHandler) throws Exception
{
- _realStore.configureMessageStore(name, messageRecoveryHandler, tlogRecoveryHandler, config);
+ _realStore.configureMessageStore(name, messageRecoveryHandler, tlogRecoveryHandler);
}
public void close() throws Exception
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStoreFactory.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStoreFactory.java
new file mode 100644
index 0000000000..a798e6d50e
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStoreFactory.java
@@ -0,0 +1,78 @@
+package org.apache.qpid.server.store;/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import org.apache.commons.configuration.Configuration;
+import org.apache.qpid.server.plugin.MessageStoreFactory;
+
+public class SlowMessageStoreFactory implements MessageStoreFactory
+{
+ @Override
+ public String getType()
+ {
+ return "SLOW";
+ }
+
+ @Override
+ public MessageStore createMessageStore()
+ {
+ return new SlowMessageStore();
+ }
+
+ @Override
+ public Map<String, Object> convertStoreConfiguration(Configuration storeConfiguration)
+ {
+ Map<String, Object> convertedMap = new HashMap<String, Object>();
+ Configuration delaysConfig = storeConfiguration.subset("delays");
+
+ @SuppressWarnings("unchecked")
+ Iterator<String> delays = delaysConfig.getKeys();
+
+ Map<String,Long> delaysMap = new HashMap<String, Long>();
+
+ while (delays.hasNext())
+ {
+ String key = delays.next();
+
+ if (key.endsWith("pre"))
+ {
+ delaysMap.put("pre"+key.substring(0, key.length() - 4), delaysConfig.getLong(key));
+ }
+ else if (key.endsWith("post"))
+ {
+ delaysMap.put("post"+key.substring(0, key.length() - 5), delaysConfig.getLong(key));
+ }
+ }
+
+ if(!delaysMap.isEmpty())
+ {
+ convertedMap.put("slowMessageStoreDelays",delaysMap);
+ }
+
+
+ convertedMap.put("realStore", storeConfiguration.getString("realStore", null));
+
+
+ return convertedMap;
+ }
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitDelayTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitDelayTest.java
index 4a81480671..182cd5ff0c 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitDelayTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitDelayTest.java
@@ -55,10 +55,10 @@ public class SyncWaitDelayTest extends QpidBrokerTestCase
public void setUp() throws Exception
{
- setVirtualHostConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST+".type",
- StandardVirtualHostFactory.TYPE);
- setVirtualHostConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST+".store.class", "org.apache.qpid.server.store.SlowMessageStore");
- setVirtualHostConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST+".store.delays.commitTran.post", String.valueOf(POST_COMMIT_DELAY));
+ final String prefix = "virtualhosts.virtualhost." + VIRTUALHOST;
+ setVirtualHostConfigurationProperty(prefix + ".type", StandardVirtualHostFactory.TYPE);
+ setVirtualHostConfigurationProperty(prefix + ".store.class", "org.apache.qpid.server.store.SlowMessageStore");
+ setVirtualHostConfigurationProperty(prefix + ".store.delays.commitTran.post", String.valueOf(POST_COMMIT_DELAY));
super.setUp();
diff --git a/qpid/java/systests/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageStoreFactory b/qpid/java/systests/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageStoreFactory
new file mode 100644
index 0000000000..fdd7a904c3
--- /dev/null
+++ b/qpid/java/systests/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageStoreFactory
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+org.apache.qpid.server.store.SlowMessageStoreFactory