diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2013-07-01 09:56:29 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2013-07-01 09:56:29 +0000 |
commit | 2549e2808832606b05383d8383e56d1fafffedee (patch) | |
tree | 26136053cf5ad8229351948f596c41bbe0d2afb3 | |
parent | 94a44efa32a181bfef063523cb592523d48af392 (diff) | |
download | qpid-python-2549e2808832606b05383d8383e56d1fafffedee.tar.gz |
QPID-4970 : [Java Broker] Configure MessageStores based on VirtualHost object not XML Configuration
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1498345 13f79535-47bb-0310-9956-ffa450edef68
44 files changed, 739 insertions, 236 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java index d036a5d39a..d7c8102c0e 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java @@ -55,6 +55,7 @@ import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.binding.Binding; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.message.EnqueableMessage; +import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.store.*; import org.apache.qpid.server.store.ConfigurationRecoveryHandler.BindingRecoveryHandler; @@ -166,19 +167,18 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo public void configureConfigStore(String name, ConfigurationRecoveryHandler recoveryHandler, - Configuration storeConfiguration) throws Exception + VirtualHost virtualHost) throws Exception { _stateManager.attainState(State.INITIALISING); _configRecoveryHandler = recoveryHandler; - configure(name, storeConfiguration); + configure(name, virtualHost); } public void configureMessageStore(String name, MessageStoreRecoveryHandler messageRecoveryHandler, - TransactionLogRecoveryHandler tlogRecoveryHandler, - Configuration storeConfiguration) throws Exception + TransactionLogRecoveryHandler tlogRecoveryHandler) throws Exception { _messageRecoveryHandler = messageRecoveryHandler; _tlogRecoveryHandler = tlogRecoveryHandler; @@ -205,18 +205,35 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo /** * Called after instantiation in order to configure the message store. * + * * @param name The name of the virtual host using this store + * @param virtualHost * @return whether a new store environment was created or not (to indicate whether recovery is necessary) * * @throws Exception If any error occurs that means the store is unable to configure itself. */ - public void configure(String name, Configuration storeConfig) throws Exception + public void configure(String name, VirtualHost virtualHost) throws Exception { - final String storeLocation = storeConfig.getString(MessageStoreConstants.ENVIRONMENT_PATH_PROPERTY, - System.getProperty("QPID_WORK") + File.separator + "bdbstore" + File.separator + name); - _persistentSizeHighThreshold = storeConfig.getLong(MessageStoreConstants.OVERFULL_SIZE_PROPERTY, Long.MAX_VALUE); - _persistentSizeLowThreshold = storeConfig.getLong(MessageStoreConstants.UNDERFULL_SIZE_PROPERTY, _persistentSizeHighThreshold); + + final String defaultPath = System.getProperty("QPID_WORK") + File.separator + "bdbstore" + File.separator + name; + + + String storeLocation = (String) virtualHost.getAttribute(VirtualHost.STORE_PATH); + if(storeLocation == null) + { + storeLocation = defaultPath; + } + + Object overfullAttr = virtualHost.getAttribute(MessageStoreConstants.OVERFULL_SIZE_ATTRIBUTE); + Object underfullAttr = virtualHost.getAttribute(MessageStoreConstants.UNDERFULL_SIZE_ATTRIBUTE); + + _persistentSizeHighThreshold = overfullAttr == null ? -1l : + overfullAttr instanceof Number ? ((Number) overfullAttr).longValue() : Long.parseLong(overfullAttr.toString()); + _persistentSizeLowThreshold = underfullAttr == null ? _persistentSizeHighThreshold : + underfullAttr instanceof Number ? ((Number) underfullAttr).longValue() : Long.parseLong(underfullAttr.toString()); + + if(_persistentSizeLowThreshold > _persistentSizeHighThreshold || _persistentSizeLowThreshold < 0l) { _persistentSizeLowThreshold = _persistentSizeHighThreshold; @@ -234,7 +251,14 @@ public abstract class AbstractBDBMessageStore implements MessageStore, DurableCo _storeLocation = storeLocation; - _envConfigMap = getConfigMap(ENVCONFIG_DEFAULTS, storeConfig, "envConfig"); + _envConfigMap = new HashMap<String, String>(); + _envConfigMap.putAll(ENVCONFIG_DEFAULTS); + + Object bdbEnvConfigAttr = virtualHost.getAttribute("bdbEnvironmentConfig"); + if(bdbEnvConfigAttr instanceof Map) + { + _envConfigMap.putAll((Map)bdbEnvConfigAttr); + } LOGGER.info("Configuring BDB message store"); diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java index ba111e8091..561e4fa660 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java @@ -39,6 +39,7 @@ import org.apache.qpid.AMQStoreException; import org.apache.qpid.server.logging.RootMessageLogger; import org.apache.qpid.server.logging.actors.AbstractActor; import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.store.HAMessageStore; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.MessageStoreRecoveryHandler; @@ -121,17 +122,17 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess private Map<String, String> _repConfig; @Override - public void configure(String name, Configuration storeConfig) throws Exception + public void configure(String name, VirtualHost virtualHost) throws Exception { //Mandatory configuration - _groupName = getValidatedPropertyFromConfig("highAvailability.groupName", storeConfig); - _nodeName = getValidatedPropertyFromConfig("highAvailability.nodeName", storeConfig); - _nodeHostPort = getValidatedPropertyFromConfig("highAvailability.nodeHostPort", storeConfig); - _helperHostPort = getValidatedPropertyFromConfig("highAvailability.helperHostPort", storeConfig); + _groupName = getValidatedStringAttribute(virtualHost, "haGroupName"); + _nodeName = getValidatedStringAttribute(virtualHost, "haNodeName"); + _nodeHostPort = getValidatedStringAttribute(virtualHost, "haNodeAddress"); + _helperHostPort = getValidatedStringAttribute(virtualHost, "haHelperAddress"); _name = name; //Optional configuration - String durabilitySetting = storeConfig.getString("highAvailability.durability"); + String durabilitySetting = getStringAttribute(virtualHost,"haDurability",null); if (durabilitySetting == null) { _durability = DEFAULT_DURABILITY; @@ -140,9 +141,15 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess { _durability = Durability.parse(durabilitySetting); } - _designatedPrimary = storeConfig.getBoolean("highAvailability.designatedPrimary", Boolean.FALSE); - _coalescingSync = storeConfig.getBoolean("highAvailability.coalescingSync", Boolean.TRUE); - _repConfig = getConfigMap(REPCONFIG_DEFAULTS, storeConfig, "repConfig"); + _designatedPrimary = getBooleanAttribute(virtualHost, "haDesignatedPrimary", Boolean.FALSE); + _coalescingSync = getBooleanAttribute(virtualHost, "haCoalescingSync", Boolean.TRUE); + + _repConfig = new HashMap<String, String>(REPCONFIG_DEFAULTS); + Object repConfigAttr = virtualHost.getAttribute("haReplicationConfig"); + if(repConfigAttr instanceof Map) + { + _repConfig.putAll((Map)repConfigAttr); + } if (_coalescingSync && _durability.getLocalSync() == SyncPolicy.SYNC) { @@ -150,9 +157,54 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess + "! Please set highAvailability.coalescingSync to false in store configuration."); } - super.configure(name, storeConfig); + super.configure(name, virtualHost); + } + + + private String getValidatedStringAttribute(org.apache.qpid.server.model.VirtualHost virtualHost, String attributeName) + throws ConfigurationException + { + Object attrValue = virtualHost.getAttribute(attributeName); + if(attrValue != null) + { + return attrValue.toString(); + } + else + { + throw new ConfigurationException("BDB HA configuration key not found. Please specify configuration attribute: " + + attributeName); + } + } + + private String getStringAttribute(org.apache.qpid.server.model.VirtualHost virtualHost, String attributeName, String defaultVal) + { + Object attrValue = virtualHost.getAttribute(attributeName); + if(attrValue != null) + { + return attrValue.toString(); + } + return defaultVal; + } + + private boolean getBooleanAttribute(org.apache.qpid.server.model.VirtualHost virtualHost, String attributeName, boolean defaultVal) + { + Object attrValue = virtualHost.getAttribute(attributeName); + if(attrValue != null) + { + if(attrValue instanceof Boolean) + { + return ((Boolean) attrValue).booleanValue(); + } + else if(attrValue instanceof String) + { + return Boolean.parseBoolean((String)attrValue); + } + + } + return defaultVal; } + @Override protected void setupStore(File storePath, String name) throws DatabaseException, AMQStoreException { @@ -209,10 +261,9 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess @Override public void configureMessageStore(String name, MessageStoreRecoveryHandler messageRecoveryHandler, - TransactionLogRecoveryHandler tlogRecoveryHandler, - Configuration config) throws Exception + TransactionLogRecoveryHandler tlogRecoveryHandler) throws Exception { - super.configureMessageStore(name, messageRecoveryHandler, tlogRecoveryHandler, config); + super.configureMessageStore(name, messageRecoveryHandler, tlogRecoveryHandler); final ReplicatedEnvironment replicatedEnvironment = getReplicatedEnvironment(); diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java index 0231573053..8cf1ad8a83 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java @@ -23,6 +23,7 @@ package org.apache.qpid.server.store.berkeleydb; import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.connection.IConnectionRegistry; import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject; +import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.stats.StatisticsGatherer; import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.Event; @@ -43,15 +44,16 @@ public class BDBHAVirtualHost extends AbstractVirtualHost BDBHAVirtualHost(VirtualHostRegistry virtualHostRegistry, StatisticsGatherer brokerStatisticsGatherer, org.apache.qpid.server.security.SecurityManager parentSecurityManager, - VirtualHostConfiguration hostConfig) + VirtualHostConfiguration hostConfig, + VirtualHost virtualHost) throws Exception { - super(virtualHostRegistry, brokerStatisticsGatherer, parentSecurityManager, hostConfig); + super(virtualHostRegistry, brokerStatisticsGatherer, parentSecurityManager, hostConfig, virtualHost); } - protected void initialiseStorage(VirtualHostConfiguration hostConfig) throws Exception + protected void initialiseStorage(VirtualHostConfiguration hostConfig, VirtualHost virtualHost) throws Exception { _messageStore = new BDBHAMessageStore(); @@ -72,12 +74,12 @@ public class BDBHAVirtualHost extends AbstractVirtualHost _messageStore.configureConfigStore(getName(), recoveryHandler, - hostConfig.getStoreConfiguration()); + virtualHost); _messageStore.configureMessageStore(getName(), recoveryHandler, - recoveryHandler, - hostConfig.getStoreConfiguration()); + recoveryHandler + ); } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostFactory.java index b01aeafb9a..3b564f33fd 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostFactory.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostFactory.java @@ -19,12 +19,16 @@ package org.apache.qpid.server.store.berkeleydb;/* * */ +import java.util.HashMap; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; +import org.apache.commons.configuration.Configuration; import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.model.adapter.VirtualHostAdapter; import org.apache.qpid.server.plugin.VirtualHostFactory; import org.apache.qpid.server.stats.StatisticsGatherer; +import org.apache.qpid.server.store.MessageStoreConstants; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; @@ -43,12 +47,14 @@ public class BDBHAVirtualHostFactory implements VirtualHostFactory public VirtualHost createVirtualHost(VirtualHostRegistry virtualHostRegistry, StatisticsGatherer brokerStatisticsGatherer, org.apache.qpid.server.security.SecurityManager parentSecurityManager, - VirtualHostConfiguration hostConfig) throws Exception + VirtualHostConfiguration hostConfig, + org.apache.qpid.server.model.VirtualHost virtualHost) throws Exception { return new BDBHAVirtualHost(virtualHostRegistry, - brokerStatisticsGatherer, - parentSecurityManager, - hostConfig); + brokerStatisticsGatherer, + parentSecurityManager, + hostConfig, + virtualHost); } @Override @@ -76,31 +82,77 @@ public class BDBHAVirtualHostFactory implements VirtualHostFactory { LinkedHashMap<String,Object> convertedMap = new LinkedHashMap<String, Object>(); convertedMap.put("store.environment-path", virtualHostAdapter.getAttribute(org.apache.qpid.server.model.VirtualHost.STORE_PATH)); - convertedMap.put("store.highAvailability.groupName", virtualHostAdapter.getAttribute("haGroupName")); - convertedMap.put("store.highAvailability.nodeName", virtualHostAdapter.getAttribute("haNodeName")); - convertedMap.put("store.highAvailability.nodeHostPort", virtualHostAdapter.getAttribute("haNodeAddress")); - convertedMap.put("store.highAvailability.helperHostPort", virtualHostAdapter.getAttribute("haHelperAddress")); - final Object haDurability = virtualHostAdapter.getAttribute("haDurability"); + return convertedMap; + } + + public Map<String, Object> convertVirtualHostConfiguration(Configuration configuration) + { + + LinkedHashMap<String,Object> convertedMap = new LinkedHashMap<String, Object>(); + + Configuration storeConfiguration = configuration.subset("store"); + + convertedMap.put(org.apache.qpid.server.model.VirtualHost.STORE_PATH, storeConfiguration.getString(MessageStoreConstants.ENVIRONMENT_PATH_PROPERTY)); + convertedMap.put(MessageStoreConstants.OVERFULL_SIZE_ATTRIBUTE, storeConfiguration.getString(MessageStoreConstants.OVERFULL_SIZE_PROPERTY)); + convertedMap.put(MessageStoreConstants.UNDERFULL_SIZE_ATTRIBUTE, storeConfiguration.getString(MessageStoreConstants.UNDERFULL_SIZE_PROPERTY)); + convertedMap.put("haGroupName", configuration.getString("store.highAvailability.groupName")); + convertedMap.put("haNodeName", configuration.getString("store.highAvailability.nodeName")); + convertedMap.put("haNodeAddress", configuration.getString("store.highAvailability.nodeHostPort")); + convertedMap.put("haHelperAddress", configuration.getString("store.highAvailability.helperHostPort")); + + final Object haDurability = configuration.getString("store.highAvailability.durability"); if(haDurability !=null) { - convertedMap.put("store.highAvailability.durability", haDurability); + convertedMap.put("haDurability", haDurability); } - final Object designatedPrimary = virtualHostAdapter.getAttribute("haDesignatedPrimary"); + final Object designatedPrimary = configuration.getString("store.highAvailability.designatedPrimary"); if(designatedPrimary!=null) { - convertedMap.put("store.highAvailability.designatedPrimary", designatedPrimary); + convertedMap.put("haDesignatedPrimary", designatedPrimary); } - final Object coalescingSync = virtualHostAdapter.getAttribute("haCoalescingSync"); + final Object coalescingSync = configuration.getString("store.highAvailability.coalescingSync"); if(coalescingSync!=null) { - convertedMap.put("store.highAvailability.coalescingSync", coalescingSync); + convertedMap.put("haCoalescingSync", coalescingSync); + } + + + Map<String, String> attributes = getEnvironmentMap(storeConfiguration, "envConfig"); + + if(!attributes.isEmpty()) + { + convertedMap.put("bdbEnvironmentConfig",attributes); } - // TODO REP_CONFIG values + attributes = getEnvironmentMap(storeConfiguration, "repConfig"); + + if(!attributes.isEmpty()) + { + convertedMap.put("haReplicationConfig",attributes); + } return convertedMap; + + } + + private Map<String, String> getEnvironmentMap(Configuration storeConfiguration, String configName) + { + final List<Object> argumentNames = storeConfiguration.getList(configName +".name"); + final List<Object> argumentValues = storeConfiguration.getList(configName +".value"); + final int initialSize = argumentNames.size(); + + final Map<String,String> attributes = new HashMap<String,String>(initialSize); + + for (int i = 0; i < argumentNames.size(); i++) + { + final String argName = argumentNames.get(i).toString(); + final String argValue = argumentValues.get(i).toString(); + + attributes.put(argName, argValue); + } + return attributes; } } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java index ef886d3d6d..4d224ab86e 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java @@ -20,6 +20,11 @@ */ package org.apache.qpid.server.store.berkeleydb; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.commons.configuration.Configuration; import org.apache.qpid.server.plugin.MessageStoreFactory; import org.apache.qpid.server.store.MessageStore; @@ -38,4 +43,33 @@ public class BDBMessageStoreFactory implements MessageStoreFactory return new BDBMessageStore(); } + @Override + public Map<String, Object> convertStoreConfiguration(Configuration storeConfiguration) + { + final List<Object> argumentNames = storeConfiguration.getList("envConfig.name"); + final List<Object> argumentValues = storeConfiguration.getList("envConfig.value"); + final int initialSize = argumentNames.size(); + + final Map<String,String> attributes = new HashMap<String,String>(initialSize); + + for (int i = 0; i < argumentNames.size(); i++) + { + final String argName = argumentNames.get(i).toString(); + final String argValue = argumentValues.get(i).toString(); + + attributes.put(argName, argValue); + } + + if(initialSize != 0) + { + return Collections.singletonMap("bdbEnvironmentConfig", (Object)attributes); + } + else + { + return Collections.emptyMap(); + } + + + } + } diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreTest.java index 047b102817..0bbd399b9f 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreTest.java @@ -23,6 +23,8 @@ package org.apache.qpid.server.store.berkeleydb; import java.io.File; import java.net.InetAddress; +import java.util.HashMap; +import java.util.Map; import org.apache.commons.configuration.XMLConfiguration; import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.util.BrokerTestHelper; @@ -35,6 +37,10 @@ import com.sleepycat.je.EnvironmentConfig; import com.sleepycat.je.rep.ReplicatedEnvironment; import com.sleepycat.je.rep.ReplicationConfig; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + public class BDBHAMessageStoreTest extends QpidTestCase { @@ -48,6 +54,7 @@ public class BDBHAMessageStoreTest extends QpidTestCase private String _host; private XMLConfiguration _configXml; private VirtualHost _virtualHost; + private org.apache.qpid.server.model.VirtualHost _modelVhost; public void setUp() throws Exception { @@ -60,6 +67,8 @@ public class BDBHAMessageStoreTest extends QpidTestCase FileUtils.delete(new File(_workDir), true); _configXml = new XMLConfiguration(); + _modelVhost = mock(org.apache.qpid.server.model.VirtualHost.class); + BrokerTestHelper.setUp(); } @@ -87,7 +96,8 @@ public class BDBHAMessageStoreTest extends QpidTestCase addVirtualHostConfiguration(); String vhostName = "test" + _masterPort; VirtualHostConfiguration configuration = new VirtualHostConfiguration(vhostName, _configXml.subset("virtualhosts.virtualhost." + vhostName), BrokerTestHelper.createBrokerMock()); - _virtualHost = BrokerTestHelper.createVirtualHost(configuration); + + _virtualHost = BrokerTestHelper.createVirtualHost(configuration,null,_modelVhost); BDBHAMessageStore store = (BDBHAMessageStore) _virtualHost.getMessageStore(); // test whether JVM system settings were applied @@ -116,27 +126,25 @@ public class BDBHAMessageStoreTest extends QpidTestCase _configXml.addProperty("virtualhosts.virtualhost.name", vhostName); _configXml.addProperty(vhostPrefix + ".type", BDBHAVirtualHostFactory.TYPE); - _configXml.addProperty(vhostPrefix + ".store.class", BDBHAMessageStore.class.getName()); - _configXml.addProperty(vhostPrefix + ".store.environment-path", _workDir + File.separator - + port); - _configXml.addProperty(vhostPrefix + ".store.highAvailability.groupName", _groupName); - _configXml.addProperty(vhostPrefix + ".store.highAvailability.nodeName", nodeName); - _configXml.addProperty(vhostPrefix + ".store.highAvailability.nodeHostPort", - getNodeHostPortForNodeAt(port)); - _configXml.addProperty(vhostPrefix + ".store.highAvailability.helperHostPort", - getHelperHostPort()); - - _configXml.addProperty(vhostPrefix + ".store.envConfig(-1).name", EnvironmentConfig.CLEANER_THREADS); - _configXml.addProperty(vhostPrefix + ".store.envConfig.value", TEST_NUMBER_OF_THREADS); - - _configXml.addProperty(vhostPrefix + ".store.envConfig(-1).name", EnvironmentConfig.LOG_FILE_MAX); - _configXml.addProperty(vhostPrefix + ".store.envConfig.value", TEST_LOG_FILE_MAX); - - _configXml.addProperty(vhostPrefix + ".store.repConfig(-1).name", ReplicationConfig.ELECTIONS_PRIMARY_RETRIES); - _configXml.addProperty(vhostPrefix + ".store.repConfig.value", TEST_ELECTION_RETRIES); - - _configXml.addProperty(vhostPrefix + ".store.repConfig(-1).name", ReplicationConfig.ENV_CONSISTENCY_TIMEOUT); - _configXml.addProperty(vhostPrefix + ".store.repConfig.value", TEST_ENV_CONSISTENCY_TIMEOUT); + + when(_modelVhost.getAttribute(eq(_modelVhost.STORE_PATH))).thenReturn(_workDir + File.separator + + port); + when(_modelVhost.getAttribute(eq("haGroupName"))).thenReturn(_groupName); + when(_modelVhost.getAttribute(eq("haNodeName"))).thenReturn(nodeName); + when(_modelVhost.getAttribute(eq("haNodeAddress"))).thenReturn(getNodeHostPortForNodeAt(port)); + when(_modelVhost.getAttribute(eq("haHelperAddress"))).thenReturn(getHelperHostPort()); + + Map<String,String> bdbEnvConfig = new HashMap<String,String>(); + bdbEnvConfig.put(EnvironmentConfig.CLEANER_THREADS, TEST_NUMBER_OF_THREADS); + bdbEnvConfig.put(EnvironmentConfig.LOG_FILE_MAX, TEST_LOG_FILE_MAX); + + when(_modelVhost.getAttribute(eq("bdbEnvironmentConfig"))).thenReturn(bdbEnvConfig); + + Map<String,String> repConfig = new HashMap<String,String>(); + repConfig.put(ReplicationConfig.ELECTIONS_PRIMARY_RETRIES, TEST_ELECTION_RETRIES); + repConfig.put(ReplicationConfig.ENV_CONSISTENCY_TIMEOUT, TEST_ENV_CONSISTENCY_TIMEOUT); + when(_modelVhost.getAttribute(eq("haReplicationConfig"))).thenReturn(repConfig); + } private String getNodeNameForNodeAt(final int bdbPort) diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreQuotaEventsTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreQuotaEventsTest.java index fe48e29d0b..8ba0d41e03 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreQuotaEventsTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreQuotaEventsTest.java @@ -20,11 +20,19 @@ */ package org.apache.qpid.server.store.berkeleydb; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import org.apache.commons.configuration.XMLConfiguration; import org.apache.log4j.Logger; +import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.MessageStoreConstants; import org.apache.qpid.server.store.MessageStoreQuotaEventsTestBase; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.when; + public class BDBMessageStoreQuotaEventsTest extends MessageStoreQuotaEventsTestBase { private static final Logger _logger = Logger.getLogger(BDBMessageStoreQuotaEventsTest.class); @@ -54,14 +62,15 @@ public class BDBMessageStoreQuotaEventsTest extends MessageStoreQuotaEventsTestB } @Override - protected void applyStoreSpecificConfiguration(XMLConfiguration config) + protected void applyStoreSpecificConfiguration(VirtualHost virtualHost) { _logger.debug("Applying store specific config. overfull-sze=" + OVERFULL_SIZE + ", underfull-size=" + UNDERFULL_SIZE); - config.addProperty("envConfig(-1).name", "je.log.fileMax"); - config.addProperty("envConfig.value", MAX_BDB_LOG_SIZE); - config.addProperty("overfull-size", OVERFULL_SIZE); - config.addProperty("underfull-size", UNDERFULL_SIZE); + Map<String,String> envMap = Collections.singletonMap("je.log.fileMax", MAX_BDB_LOG_SIZE); + when(virtualHost.getAttribute(eq("bdbEnvironmentConfig"))).thenReturn(envMap); + when(virtualHost.getAttribute(eq(MessageStoreConstants.OVERFULL_SIZE_ATTRIBUTE))).thenReturn(OVERFULL_SIZE); + when(virtualHost.getAttribute(eq(MessageStoreConstants.UNDERFULL_SIZE_ATTRIBUTE))).thenReturn(UNDERFULL_SIZE); + } @Override diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java index 11b30e66ad..e77119b140 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java @@ -39,6 +39,7 @@ import org.apache.qpid.server.message.MessageMetaData_0_10; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.UUIDGenerator; +import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.store.MessageMetaDataType; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StorableMessageMetaData; @@ -54,6 +55,10 @@ import org.apache.qpid.transport.MessageDeliveryPriority; import org.apache.qpid.transport.MessageProperties; import org.apache.qpid.transport.MessageTransfer; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + /** * Subclass of MessageStoreTest which runs the standard tests from the superclass against * the BDB Store as well as additional tests specific to the BDB store-implementation. @@ -226,7 +231,7 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto messageStore.close(); AbstractBDBMessageStore newStore = new BDBMessageStore(); - newStore.configure("", getConfig().subset("store")); + newStore.configure("", getVirtualHostModel()); newStore.startWithNoRecover(); diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAMessageStoreSmokeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAMessageStoreSmokeTest.java index eaa3c3eba4..5ad49462ac 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAMessageStoreSmokeTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAMessageStoreSmokeTest.java @@ -21,18 +21,20 @@ package org.apache.qpid.server.store.berkeleydb; import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.configuration.XMLConfiguration; +import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.test.utils.QpidTestCase; +import static org.mockito.Mockito.mock; + public class HAMessageStoreSmokeTest extends QpidTestCase { private final BDBHAMessageStore _store = new BDBHAMessageStore(); - private final XMLConfiguration _config = new XMLConfiguration(); public void testMissingHAConfigThrowsException() throws Exception { try { - _store.configure("test", _config); + _store.configure("test", mock(VirtualHost.class)); fail("Expected an exception to be thrown"); } catch (ConfigurationException ce) @@ -40,4 +42,4 @@ public class HAMessageStoreSmokeTest extends QpidTestCase assertTrue(ce.getMessage().contains("BDB HA configuration key not found")); } } -}
\ No newline at end of file +} diff --git a/qpid/java/broker-plugins/jdbc-provider-bone/src/main/java/org/apache/qpid/server/store/jdbc/bonecp/BoneCPConnectionProvider.java b/qpid/java/broker-plugins/jdbc-provider-bone/src/main/java/org/apache/qpid/server/store/jdbc/bonecp/BoneCPConnectionProvider.java index 7545b84611..1cde6f130d 100644 --- a/qpid/java/broker-plugins/jdbc-provider-bone/src/main/java/org/apache/qpid/server/store/jdbc/bonecp/BoneCPConnectionProvider.java +++ b/qpid/java/broker-plugins/jdbc-provider-bone/src/main/java/org/apache/qpid/server/store/jdbc/bonecp/BoneCPConnectionProvider.java @@ -25,6 +25,7 @@ import com.jolbox.bonecp.BoneCPConfig; import java.sql.Connection; import java.sql.SQLException; import org.apache.commons.configuration.Configuration; +import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.store.jdbc.ConnectionProvider; public class BoneCPConnectionProvider implements ConnectionProvider @@ -34,17 +35,43 @@ public class BoneCPConnectionProvider implements ConnectionProvider public static final int DEFAULT_PARTITION_COUNT = 4; private final BoneCP _connectionPool; - public BoneCPConnectionProvider(String connectionUrl, Configuration storeConfiguration) throws SQLException + public BoneCPConnectionProvider(String connectionUrl, VirtualHost virtualHost) throws SQLException { BoneCPConfig config = new BoneCPConfig(); config.setJdbcUrl(connectionUrl); - config.setMinConnectionsPerPartition(storeConfiguration.getInteger("minConnectionsPerPartition", DEFAULT_MIN_CONNECTIONS_PER_PARTITION)); - config.setMaxConnectionsPerPartition(storeConfiguration.getInteger("maxConnectionsPerPartition", DEFAULT_MAX_CONNECTIONS_PER_PARTITION)); - config.setPartitionCount(storeConfiguration.getInteger("partitionCount",DEFAULT_PARTITION_COUNT)); + + config.setMinConnectionsPerPartition(getIntegerAttribute(virtualHost, "minConnectionsPerPartition", DEFAULT_MIN_CONNECTIONS_PER_PARTITION)); + config.setMaxConnectionsPerPartition(getIntegerAttribute(virtualHost, "maxConnectionsPerPartition", DEFAULT_MAX_CONNECTIONS_PER_PARTITION)); + config.setPartitionCount(getIntegerAttribute(virtualHost, "partitionCount",DEFAULT_PARTITION_COUNT)); _connectionPool = new BoneCP(config); } + private int getIntegerAttribute(VirtualHost virtualHost, String attributeName, int defaultVal) + { + Object attrValue = virtualHost.getAttribute(attributeName); + if(attrValue != null) + { + if(attrValue instanceof Number) + { + return ((Number) attrValue).intValue(); + } + else if(attrValue instanceof String) + { + try + { + return Integer.parseInt((String)attrValue); + } + catch (NumberFormatException e) + { + return defaultVal; + } + } + + } + return defaultVal; + } + @Override public Connection getConnection() throws SQLException { diff --git a/qpid/java/broker-plugins/jdbc-provider-bone/src/main/java/org/apache/qpid/server/store/jdbc/bonecp/BoneCPConnectionProviderFactory.java b/qpid/java/broker-plugins/jdbc-provider-bone/src/main/java/org/apache/qpid/server/store/jdbc/bonecp/BoneCPConnectionProviderFactory.java index 71c59c7772..73876eceb4 100644 --- a/qpid/java/broker-plugins/jdbc-provider-bone/src/main/java/org/apache/qpid/server/store/jdbc/bonecp/BoneCPConnectionProviderFactory.java +++ b/qpid/java/broker-plugins/jdbc-provider-bone/src/main/java/org/apache/qpid/server/store/jdbc/bonecp/BoneCPConnectionProviderFactory.java @@ -22,6 +22,7 @@ package org.apache.qpid.server.store.jdbc.bonecp; import java.sql.SQLException; import org.apache.commons.configuration.Configuration; +import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.plugin.JDBCConnectionProviderFactory; import org.apache.qpid.server.store.jdbc.ConnectionProvider; @@ -34,9 +35,9 @@ public class BoneCPConnectionProviderFactory implements JDBCConnectionProviderFa } @Override - public ConnectionProvider getConnectionProvider(String connectionUrl, Configuration storeConfiguration) + public ConnectionProvider getConnectionProvider(String connectionUrl, VirtualHost virtualHost) throws SQLException { - return new BoneCPConnectionProvider(connectionUrl, storeConfiguration); + return new BoneCPConnectionProvider(connectionUrl, virtualHost); } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java index 383ff2f3f6..68177dac62 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java @@ -144,7 +144,7 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual { validateAttributes(type); } - } + }/* else { if (type != null) @@ -152,7 +152,7 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual invalidAttributes = true; } - } + }*/ if (invalidAttributes) { throw new IllegalConfigurationException("Please specify either the 'configPath' attribute or 'type' attributes"); @@ -1109,9 +1109,10 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual else { _virtualHost = factory.createVirtualHost(_broker.getVirtualHostRegistry(), - _brokerStatisticsGatherer, - _broker.getSecurityManager(), - configuration); + _brokerStatisticsGatherer, + _broker.getSecurityManager(), + configuration, + this); } } catch (Exception e) @@ -1172,6 +1173,17 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual throw new IllegalConfigurationException("Configuration file '" + configurationFile + "' does not exist"); } configuration = new VirtualHostConfiguration(virtualHostName, new File(configurationFile) , _broker); + String type = configuration.getType(); + changeAttribute(TYPE,null,type); + VirtualHostFactory factory = VirtualHostFactory.FACTORIES.get(type); + if(factory != null) + { + for(Map.Entry<String,Object> entry : factory.convertVirtualHostConfiguration(configuration.getConfig()).entrySet()) + { + changeAttribute(entry.getKey(), getAttribute(entry.getKey()), entry.getValue()); + } + } + } return configuration; } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/plugin/JDBCConnectionProviderFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/plugin/JDBCConnectionProviderFactory.java index e3b7f03978..a5f4ea063b 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/plugin/JDBCConnectionProviderFactory.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/plugin/JDBCConnectionProviderFactory.java @@ -26,13 +26,14 @@ import java.util.Collection; import java.util.Collections; import java.util.List; import org.apache.commons.configuration.Configuration; +import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.store.jdbc.ConnectionProvider; public interface JDBCConnectionProviderFactory { String getType(); - ConnectionProvider getConnectionProvider(String connectionUrl, Configuration storeConfiguration) + ConnectionProvider getConnectionProvider(String connectionUrl, VirtualHost virtualHost) throws SQLException; static final class TYPES diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/plugin/MessageStoreFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/plugin/MessageStoreFactory.java index aff06af4ce..9297f34f94 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/plugin/MessageStoreFactory.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/plugin/MessageStoreFactory.java @@ -20,6 +20,8 @@ */ package org.apache.qpid.server.plugin; +import java.util.Map; +import org.apache.commons.configuration.Configuration; import org.apache.qpid.server.store.MessageStore; public interface MessageStoreFactory @@ -27,4 +29,7 @@ public interface MessageStoreFactory String getType(); MessageStore createMessageStore(); + + public Map<String, Object> convertStoreConfiguration(Configuration configuration); + } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/plugin/VirtualHostFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/plugin/VirtualHostFactory.java index f952e0410c..2a3b65f829 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/plugin/VirtualHostFactory.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/plugin/VirtualHostFactory.java @@ -25,6 +25,7 @@ import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; +import org.apache.commons.configuration.Configuration; import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.model.adapter.VirtualHostAdapter; import org.apache.qpid.server.security.SecurityManager; @@ -39,12 +40,15 @@ public interface VirtualHostFactory VirtualHost createVirtualHost(VirtualHostRegistry virtualHostRegistry, StatisticsGatherer brokerStatisticsGatherer, SecurityManager parentSecurityManager, - VirtualHostConfiguration hostConfig) throws Exception; + VirtualHostConfiguration hostConfig, + org.apache.qpid.server.model.VirtualHost virtualHost) throws Exception; void validateAttributes(Map<String, Object> attributes); Map<String, Object> createVirtualHostConfiguration(VirtualHostAdapter virtualHostAdapter); + Map<String,Object> convertVirtualHostConfiguration(Configuration configuration); + static final class TYPES { private TYPES() diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java index b7f5035de0..758945d6a1 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java @@ -29,7 +29,6 @@ import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.sql.Connection; import java.sql.DatabaseMetaData; -import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; @@ -41,7 +40,6 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; -import org.apache.commons.configuration.Configuration; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.AMQStoreException; @@ -50,6 +48,7 @@ import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.binding.Binding; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.message.EnqueableMessage; +import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.store.ConfigurationRecoveryHandler.BindingRecoveryHandler; import org.apache.qpid.server.store.ConfigurationRecoveryHandler.ExchangeRecoveryHandler; @@ -170,21 +169,20 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC @Override public void configureConfigStore(String name, - ConfigurationRecoveryHandler configRecoveryHandler, - Configuration storeConfiguration) throws Exception + ConfigurationRecoveryHandler configRecoveryHandler, + VirtualHost virtualHost) throws Exception { _stateManager.attainState(State.INITIALISING); _configRecoveryHandler = configRecoveryHandler; - commonConfiguration(name, storeConfiguration); + commonConfiguration(name, virtualHost); } @Override public void configureMessageStore(String name, - MessageStoreRecoveryHandler recoveryHandler, - TransactionLogRecoveryHandler tlogRecoveryHandler, - Configuration storeConfiguration) throws Exception + MessageStoreRecoveryHandler recoveryHandler, + TransactionLogRecoveryHandler tlogRecoveryHandler) throws Exception { _tlogRecoveryHandler = tlogRecoveryHandler; _messageRecoveryHandler = recoveryHandler; @@ -206,15 +204,16 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC _stateManager.attainState(State.ACTIVE); } - private void commonConfiguration(String name, Configuration storeConfiguration) + private void commonConfiguration(String name, VirtualHost virtualHost) throws ClassNotFoundException, SQLException { - implementationSpecificConfiguration(name, storeConfiguration); + implementationSpecificConfiguration(name, virtualHost); createOrOpenDatabase(); } - protected abstract void implementationSpecificConfiguration(String name, Configuration storeConfiguration) throws ClassNotFoundException, SQLException; + protected abstract void implementationSpecificConfiguration(String name, + VirtualHost virtualHost) throws ClassNotFoundException, SQLException; abstract protected Logger getLogger(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java index 27a40963f6..7092655e95 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java @@ -26,6 +26,7 @@ import org.apache.qpid.AMQStoreException; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.binding.Binding; import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.queue.AMQQueue; public interface DurableConfigurationStore @@ -40,14 +41,16 @@ public interface DurableConfigurationStore * Called after instantiation in order to configure the message store. A particular implementation can define * whatever parameters it wants. * + * + * * @param name The name to be used by this store * @param recoveryHandler Handler to be called as the store recovers on start up - * @param config The apache commons configuration object. + * @param virtualHost * @throws Exception If any error occurs that means the store is unable to configure itself. */ void configureConfigStore(String name, ConfigurationRecoveryHandler recoveryHandler, - Configuration config) throws Exception; + VirtualHost virtualHost) throws Exception; /** * Makes the specified exchange persistent. * diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java index 3f1d1b9530..9b2496f262 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java @@ -20,12 +20,12 @@ */ package org.apache.qpid.server.store; -import org.apache.commons.configuration.Configuration; import org.apache.qpid.AMQStoreException; import org.apache.qpid.server.message.EnqueableMessage; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import org.apache.qpid.server.model.VirtualHost; /** A simple message store that stores the messages in a thread-safe structure in memory. */ public class MemoryMessageStore extends NullMessageStore @@ -82,13 +82,17 @@ public class MemoryMessageStore extends NullMessageStore } @Override - public void configureConfigStore(String name, ConfigurationRecoveryHandler recoveryHandler, Configuration config) throws Exception + public void configureConfigStore(String name, + ConfigurationRecoveryHandler recoveryHandler, + VirtualHost virtualHost) throws Exception { _stateManager.attainState(State.INITIALISING); } @Override - public void configureMessageStore(String name, MessageStoreRecoveryHandler recoveryHandler, TransactionLogRecoveryHandler tlogRecoveryHandler, Configuration config) throws Exception + public void configureMessageStore(String name, + MessageStoreRecoveryHandler recoveryHandler, + TransactionLogRecoveryHandler tlogRecoveryHandler) throws Exception { _stateManager.attainState(State.INITIALISED); } @@ -97,7 +101,7 @@ public class MemoryMessageStore extends NullMessageStore public void activate() throws Exception { _stateManager.attainState(State.ACTIVATING); - + _stateManager.attainState(State.ACTIVE); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStoreFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStoreFactory.java index dceba4af31..476b2f127d 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStoreFactory.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStoreFactory.java @@ -20,6 +20,9 @@ */ package org.apache.qpid.server.store; +import java.util.Collections; +import java.util.Map; +import org.apache.commons.configuration.Configuration; import org.apache.qpid.server.plugin.MessageStoreFactory; public class MemoryMessageStoreFactory implements MessageStoreFactory @@ -37,4 +40,10 @@ public class MemoryMessageStoreFactory implements MessageStoreFactory return new MemoryMessageStore(); } + @Override + public Map<String, Object> convertStoreConfiguration(Configuration configuration) + { + return Collections.emptyMap(); + } + } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java index bbdfaf4959..5fc6bad368 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java @@ -32,16 +32,15 @@ public interface MessageStore * Called after instantiation in order to configure the message store. A particular implementation can define * whatever parameters it wants. * + * * @param name The name to be used by this store * @param messageRecoveryHandler Handler to be called as the store recovers on start up * @param tlogRecoveryHandler - * @param config The apache commons configuration object. * @throws Exception If any error occurs that means the store is unable to configure itself. */ void configureMessageStore(String name, MessageStoreRecoveryHandler messageRecoveryHandler, - TransactionLogRecoveryHandler tlogRecoveryHandler, - Configuration config) throws Exception; + TransactionLogRecoveryHandler tlogRecoveryHandler) throws Exception; void activate() throws Exception; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreConstants.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreConstants.java index 728da23f28..93b669e6e4 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreConstants.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreConstants.java @@ -25,5 +25,7 @@ public class MessageStoreConstants public static final String ENVIRONMENT_PATH_PROPERTY = "environment-path"; public static final String OVERFULL_SIZE_PROPERTY = "overfull-size"; public static final String UNDERFULL_SIZE_PROPERTY = "underfull-size"; + public static final String OVERFULL_SIZE_ATTRIBUTE = "storeOverfullSize"; + public static final String UNDERFULL_SIZE_ATTRIBUTE = "storeUnderfullSize"; } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java index f0936a221c..3b57bbfa55 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java @@ -19,11 +19,11 @@ */ package org.apache.qpid.server.store; -import org.apache.commons.configuration.Configuration; import org.apache.qpid.AMQStoreException; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.binding.Binding; import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.queue.AMQQueue; public abstract class NullMessageStore implements MessageStore, DurableConfigurationStore @@ -31,7 +31,7 @@ public abstract class NullMessageStore implements MessageStore, DurableConfigura @Override public void configureConfigStore(String name, ConfigurationRecoveryHandler recoveryHandler, - Configuration config) throws Exception + VirtualHost virtualHost) throws Exception { } @@ -77,8 +77,8 @@ public abstract class NullMessageStore implements MessageStore, DurableConfigura @Override public void configureMessageStore(String name, - MessageStoreRecoveryHandler recoveryHandler, - TransactionLogRecoveryHandler tlogRecoveryHandler, Configuration config) throws Exception + MessageStoreRecoveryHandler recoveryHandler, + TransactionLogRecoveryHandler tlogRecoveryHandler) throws Exception { } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java index 0d4231a10d..ac310d02c9 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java @@ -32,8 +32,8 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.util.ArrayList; import java.util.List; -import org.apache.commons.configuration.Configuration; import org.apache.log4j.Logger; +import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.store.AbstractJDBCMessageStore; import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.Event; @@ -123,15 +123,20 @@ public class DerbyMessageStore extends AbstractJDBCMessageStore implements Messa } @Override - protected void implementationSpecificConfiguration(String name, Configuration storeConfiguration) + protected void implementationSpecificConfiguration(String name, + VirtualHost virtualHost) throws ClassNotFoundException { //Update to pick up QPID_WORK and use that as the default location not just derbyDB _driverClass = (Class<Driver>) Class.forName(SQL_DRIVER_NAME); - final String databasePath = storeConfiguration.getString(MessageStoreConstants.ENVIRONMENT_PATH_PROPERTY, System.getProperty("QPID_WORK") - + File.separator + "derbyDB"); + String defaultPath = System.getProperty("QPID_WORK") + File.separator + "derbyDB"; + String databasePath = (String) virtualHost.getAttribute(VirtualHost.STORE_PATH); + if(databasePath == null) + { + databasePath = defaultPath; + } if(!MEMORY_STORE_LOCATION.equals(databasePath)) { @@ -148,8 +153,14 @@ public class DerbyMessageStore extends AbstractJDBCMessageStore implements Messa _storeLocation = databasePath; - _persistentSizeHighThreshold = storeConfiguration.getLong(MessageStoreConstants.OVERFULL_SIZE_PROPERTY, -1l); - _persistentSizeLowThreshold = storeConfiguration.getLong(MessageStoreConstants.UNDERFULL_SIZE_PROPERTY, _persistentSizeHighThreshold); + Object overfullAttr = virtualHost.getAttribute(MessageStoreConstants.OVERFULL_SIZE_ATTRIBUTE); + Object underfullAttr = virtualHost.getAttribute(MessageStoreConstants.UNDERFULL_SIZE_ATTRIBUTE); + + _persistentSizeHighThreshold = overfullAttr == null ? -1l : + overfullAttr instanceof Number ? ((Number) overfullAttr).longValue() : Long.parseLong(overfullAttr.toString()); + _persistentSizeLowThreshold = underfullAttr == null ? _persistentSizeHighThreshold : + underfullAttr instanceof Number ? ((Number) underfullAttr).longValue() : Long.parseLong(underfullAttr.toString()); + if(_persistentSizeLowThreshold > _persistentSizeHighThreshold || _persistentSizeLowThreshold < 0l) { _persistentSizeLowThreshold = _persistentSizeHighThreshold; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStoreFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStoreFactory.java index 81e5bb9ff9..0f53d66435 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStoreFactory.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStoreFactory.java @@ -20,6 +20,9 @@ */ package org.apache.qpid.server.store.derby; +import java.util.Collections; +import java.util.Map; +import org.apache.commons.configuration.Configuration; import org.apache.qpid.server.plugin.MessageStoreFactory; import org.apache.qpid.server.store.MessageStore; @@ -38,4 +41,10 @@ public class DerbyMessageStoreFactory implements MessageStoreFactory return new DerbyMessageStore(); } + @Override + public Map<String, Object> convertStoreConfiguration(Configuration configuration) + { + return Collections.emptyMap(); + } + } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/jdbc/DefaultConnectionProviderFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/jdbc/DefaultConnectionProviderFactory.java index 0f074cc95b..a2d3644590 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/jdbc/DefaultConnectionProviderFactory.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/jdbc/DefaultConnectionProviderFactory.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.store.jdbc; import org.apache.commons.configuration.Configuration; +import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.plugin.JDBCConnectionProviderFactory; public class DefaultConnectionProviderFactory implements JDBCConnectionProviderFactory @@ -34,7 +35,7 @@ public class DefaultConnectionProviderFactory implements JDBCConnectionProviderF @Override public ConnectionProvider getConnectionProvider(String connectionUrl, - Configuration storeConfiguration) + VirtualHost virtualHost) { return new DefaultConnectionProvider(connectionUrl); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java index 79093fe2e2..951ea28c20 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java @@ -29,13 +29,12 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; -import org.apache.commons.configuration.Configuration; import org.apache.log4j.Logger; import org.apache.qpid.AMQStoreException; +import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.plugin.JDBCConnectionProviderFactory; import org.apache.qpid.server.store.AbstractJDBCMessageStore; import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.MessageStoreConstants; import org.apache.qpid.server.store.StoreFuture; import org.apache.qpid.server.store.Transaction; @@ -275,13 +274,15 @@ public class JDBCMessageStore extends AbstractJDBCMessageStore implements Messag } - protected void implementationSpecificConfiguration(String name, Configuration storeConfiguration) + protected void implementationSpecificConfiguration(String name, + VirtualHost virtualHost) throws ClassNotFoundException, SQLException { - String connectionURL = storeConfiguration.getString("connectionUrl", - storeConfiguration.getString(MessageStoreConstants.ENVIRONMENT_PATH_PROPERTY)); + String connectionURL = virtualHost.getAttribute("connectionURL") == null + ? String.valueOf(virtualHost.getAttribute(VirtualHost.STORE_PATH)) + : String.valueOf(virtualHost.getAttribute("connectionURL")); JDBCDetails details = null; @@ -301,8 +302,9 @@ public class JDBCMessageStore extends AbstractJDBCMessageStore implements Messag } - Configuration poolConfig = storeConfiguration.subset("pool"); - String connectionPoolType = poolConfig.getString("type", "DEFAULT"); + Object poolAttribute = virtualHost.getAttribute("connectionPool"); + String connectionPoolType = poolAttribute == null ? "DEFAULT" : String.valueOf(poolAttribute); + JDBCConnectionProviderFactory connectionProviderFactory = JDBCConnectionProviderFactory.FACTORIES.get(connectionPoolType); if(connectionProviderFactory == null) @@ -311,14 +313,44 @@ public class JDBCMessageStore extends AbstractJDBCMessageStore implements Messag connectionProviderFactory = new DefaultConnectionProviderFactory(); } - _connectionProvider = connectionProviderFactory.getConnectionProvider(connectionURL, poolConfig); + _connectionProvider = connectionProviderFactory.getConnectionProvider(connectionURL, virtualHost); + + _blobType = getStringAttribute(virtualHost, "jdbcBlobType",details.getBlobType()); + _varBinaryType = getStringAttribute(virtualHost, "jdbcVarbinaryType",details.getVarBinaryType()); + _useBytesMethodsForBlob = getBooleanAttribute(virtualHost, "jdbcBytesForBlob",details.isUseBytesMethodsForBlob()); + _bigIntType = getStringAttribute(virtualHost, "jdbcBigIntType", details.getBigintType()); + } + + + private String getStringAttribute(VirtualHost virtualHost, String attributeName, String defaultVal) + { + Object attrValue = virtualHost.getAttribute(attributeName); + if(attrValue != null) + { + return attrValue.toString(); + } + return defaultVal; + } + + private boolean getBooleanAttribute(VirtualHost virtualHost, String attributeName, boolean defaultVal) + { + Object attrValue = virtualHost.getAttribute(attributeName); + if(attrValue != null) + { + if(attrValue instanceof Boolean) + { + return ((Boolean) attrValue).booleanValue(); + } + else if(attrValue instanceof String) + { + return Boolean.parseBoolean((String)attrValue); + } - _blobType = storeConfiguration.getString("sqlBlobType",details.getBlobType()); - _varBinaryType = storeConfiguration.getString("sqlVarbinaryType",details.getVarBinaryType()); - _useBytesMethodsForBlob = storeConfiguration.getBoolean("useBytesForBlob",details.isUseBytesMethodsForBlob()); - _bigIntType = storeConfiguration.getString("sqlBigIntType", details.getBigintType()); + } + return defaultVal; } + protected void storedSizeChange(int contentSize) { } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreFactory.java index 1446ad34e9..99ec4c7d32 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreFactory.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreFactory.java @@ -20,6 +20,9 @@ */ package org.apache.qpid.server.store.jdbc; +import java.util.HashMap; +import java.util.Map; +import org.apache.commons.configuration.Configuration; import org.apache.qpid.server.plugin.MessageStoreFactory; import org.apache.qpid.server.store.MessageStore; @@ -38,4 +41,25 @@ public class JDBCMessageStoreFactory implements MessageStoreFactory return new JDBCMessageStore(); } + @Override + public Map<String, Object> convertStoreConfiguration(Configuration storeConfiguration) + { + Map<String,Object> convertedMap = new HashMap<String,Object>(); + convertedMap.put("jdbcBlobType", storeConfiguration.getString("sqlBlobType")); + convertedMap.put("jdbcVarbinaryType", storeConfiguration.getString("sqlVarbinaryType")); + if(storeConfiguration.containsKey("useBytesForBlob")) + { + convertedMap.put("jdbcUseBytesForBlob", storeConfiguration.getBoolean("useBytesForBlob")); + } + convertedMap.put("jdbcBigIntType", storeConfiguration.getString("sqlBigIntType")); + convertedMap.put("connectionPool", storeConfiguration.getString("pool.type")); + convertedMap.put("minConnectionsPerPartition", storeConfiguration.getInteger("pool.minConnectionsPerPartition", + null)); + convertedMap.put("maxConnectionsPerPartition", storeConfiguration.getInteger("pool.maxConnectionsPerPartition", + null)); + convertedMap.put("partitionCount", storeConfiguration.getInteger("pool.partitionCount", null)); + + return convertedMap; + } + } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java index 6116d46e41..a704ca112b 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java @@ -45,7 +45,6 @@ import org.apache.qpid.server.exchange.ExchangeFactory; import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.messages.VirtualHostMessages; -import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.AMQSessionModel; @@ -57,12 +56,8 @@ import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.stats.StatisticsCounter; import org.apache.qpid.server.stats.StatisticsGatherer; -import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.Event; import org.apache.qpid.server.store.EventListener; -import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.MessageStoreCreator; -import org.apache.qpid.server.store.OperationalLoggingListener; import org.apache.qpid.server.txn.DtxRegistry; public abstract class AbstractVirtualHost implements VirtualHost, IConnectionRegistry.RegistryChangeListener, EventListener @@ -107,7 +102,8 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg public AbstractVirtualHost(VirtualHostRegistry virtualHostRegistry, StatisticsGatherer brokerStatisticsGatherer, SecurityManager parentSecurityManager, - VirtualHostConfiguration hostConfig) throws Exception + VirtualHostConfiguration hostConfig, + org.apache.qpid.server.model.VirtualHost virtualHost) throws Exception { if (hostConfig == null) { @@ -144,13 +140,14 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg initialiseStatistics(); - initialiseStorage(hostConfig); + initialiseStorage(hostConfig, virtualHost); getMessageStore().addEventListener(this, Event.PERSISTENT_MESSAGE_SIZE_OVERFULL); getMessageStore().addEventListener(this, Event.PERSISTENT_MESSAGE_SIZE_UNDERFULL); } - abstract protected void initialiseStorage(VirtualHostConfiguration hostConfig) throws Exception; + abstract protected void initialiseStorage(VirtualHostConfiguration hostConfig, + org.apache.qpid.server.model.VirtualHost virtualHost) throws Exception; public IConnectionRegistry getConnectionRegistry() { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java index 05a33e7d99..82be0c01e1 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java @@ -19,12 +19,11 @@ package org.apache.qpid.server.virtualhost;/* * */ -import org.apache.commons.configuration.Configuration; import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject; +import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.stats.StatisticsGatherer; import org.apache.qpid.server.store.DurableConfigurationStore; -import org.apache.qpid.server.store.Event; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.MessageStoreCreator; import org.apache.qpid.server.store.OperationalLoggingListener; @@ -36,18 +35,19 @@ public class StandardVirtualHost extends AbstractVirtualHost private DurableConfigurationStore _durableConfigurationStore; StandardVirtualHost(VirtualHostRegistry virtualHostRegistry, - StatisticsGatherer brokerStatisticsGatherer, - org.apache.qpid.server.security.SecurityManager parentSecurityManager, - VirtualHostConfiguration hostConfig) throws Exception + StatisticsGatherer brokerStatisticsGatherer, + org.apache.qpid.server.security.SecurityManager parentSecurityManager, + VirtualHostConfiguration hostConfig, VirtualHost virtualHost) throws Exception { - super(virtualHostRegistry, brokerStatisticsGatherer, parentSecurityManager, hostConfig); + super(virtualHostRegistry, brokerStatisticsGatherer, parentSecurityManager, hostConfig, virtualHost); } - private MessageStore initialiseMessageStore(VirtualHostConfiguration hostConfig) throws Exception + private MessageStore initialiseMessageStore(VirtualHostConfiguration hostConfig, VirtualHost virtualHost) throws Exception { - String storeType = hostConfig.getConfig().getString("store.type"); + final Object storeTypeAttr = virtualHost.getAttribute(VirtualHost.STORE_TYPE); + String storeType = storeTypeAttr == null ? null : String.valueOf(storeTypeAttr); MessageStore messageStore = null; if (storeType == null) { @@ -74,7 +74,7 @@ public class StandardVirtualHost extends AbstractVirtualHost return messageStore; } - private DurableConfigurationStore initialiseConfigurationStore(VirtualHostConfiguration hostConfig) throws Exception + private DurableConfigurationStore initialiseConfigurationStore(VirtualHost virtualHost) throws Exception { DurableConfigurationStore configurationStore; if(getMessageStore() instanceof DurableConfigurationStore) @@ -90,19 +90,17 @@ public class StandardVirtualHost extends AbstractVirtualHost } - protected void initialiseStorage(VirtualHostConfiguration hostConfig) throws Exception + protected void initialiseStorage(VirtualHostConfiguration hostConfig, VirtualHost virtualHost) throws Exception { - _messageStore = initialiseMessageStore(hostConfig); + _messageStore = initialiseMessageStore(hostConfig, virtualHost); - _durableConfigurationStore = initialiseConfigurationStore(hostConfig); + _durableConfigurationStore = initialiseConfigurationStore(virtualHost); VirtualHostConfigRecoveryHandler recoveryHandler = new VirtualHostConfigRecoveryHandler(this); - final Configuration storeConfiguration = hostConfig.getStoreConfiguration(); + _durableConfigurationStore.configureConfigStore(getName(), recoveryHandler, virtualHost); - _durableConfigurationStore.configureConfigStore(getName(), recoveryHandler, storeConfiguration); - - _messageStore.configureMessageStore(getName(), recoveryHandler, recoveryHandler, storeConfiguration); + _messageStore.configureMessageStore(getName(), recoveryHandler, recoveryHandler); initialiseModel(hostConfig); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHostFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHostFactory.java index 3a604dbd90..a0f22aa34c 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHostFactory.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHostFactory.java @@ -19,13 +19,21 @@ package org.apache.qpid.server.virtualhost;/* * */ +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; +import org.apache.commons.configuration.Configuration; +import org.apache.commons.configuration.ConfigurationException; import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.model.adapter.VirtualHostAdapter; +import org.apache.qpid.server.plugin.MessageStoreFactory; import org.apache.qpid.server.plugin.VirtualHostFactory; import org.apache.qpid.server.stats.StatisticsGatherer; import org.apache.qpid.server.store.MemoryMessageStore; +import org.apache.qpid.server.store.MessageStoreConstants; import org.apache.qpid.server.store.MessageStoreCreator; public class StandardVirtualHostFactory implements VirtualHostFactory @@ -43,9 +51,10 @@ public class StandardVirtualHostFactory implements VirtualHostFactory public VirtualHost createVirtualHost(VirtualHostRegistry virtualHostRegistry, StatisticsGatherer brokerStatisticsGatherer, org.apache.qpid.server.security.SecurityManager parentSecurityManager, - VirtualHostConfiguration hostConfig) throws Exception + VirtualHostConfiguration hostConfig, + org.apache.qpid.server.model.VirtualHost virtualHost) throws Exception { - return new StandardVirtualHost(virtualHostRegistry, brokerStatisticsGatherer, parentSecurityManager, hostConfig); + return new StandardVirtualHost(virtualHostRegistry, brokerStatisticsGatherer, parentSecurityManager, hostConfig, virtualHost); } @@ -94,13 +103,26 @@ public class StandardVirtualHostFactory implements VirtualHostFactory convertedMap.put("store.type", virtualHostAdapter.getAttribute(org.apache.qpid.server.model.VirtualHost.STORE_TYPE)); convertedMap.put("store.environment-path", virtualHostAdapter.getAttribute(org.apache.qpid.server.model.VirtualHost.STORE_PATH)); - // TODO - this should all be inverted to populate vhost from xml and then pass model object to the store + return convertedMap; + } + + @Override + public Map<String, Object> convertVirtualHostConfiguration(Configuration configuration) + { + Map<String,Object> convertedMap = new LinkedHashMap<String, Object>(); + Configuration storeConfiguration = configuration.subset("store"); + convertedMap.put(org.apache.qpid.server.model.VirtualHost.STORE_TYPE, storeConfiguration.getString("type")); + convertedMap.put(org.apache.qpid.server.model.VirtualHost.STORE_PATH, storeConfiguration.getString(MessageStoreConstants.ENVIRONMENT_PATH_PROPERTY)); - convertedMap.put("store.pool.type",virtualHostAdapter.getAttribute("connectionPool")); - convertedMap.put("store.pool.minConnectionsPerPartition",virtualHostAdapter.getAttribute("minConnectionsPerPartition")); - convertedMap.put("store.pool.maxConnectionsPerPartition",virtualHostAdapter.getAttribute("maxConnectionsPerPartition")); - convertedMap.put("store.pool.partitionCount",virtualHostAdapter.getAttribute("partitionCount")); + convertedMap.put(MessageStoreConstants.OVERFULL_SIZE_ATTRIBUTE, storeConfiguration.getString(MessageStoreConstants.OVERFULL_SIZE_PROPERTY)); + convertedMap.put(MessageStoreConstants.UNDERFULL_SIZE_ATTRIBUTE, storeConfiguration.getString(MessageStoreConstants.UNDERFULL_SIZE_PROPERTY)); + + for(MessageStoreFactory mf : new MessageStoreCreator().getFactories()) + { + convertedMap.putAll(mf.convertStoreConfiguration(storeConfiguration)); + } return convertedMap; + } } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/DurableConfigurationStoreTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/DurableConfigurationStoreTest.java index 8a7d5d85fc..57e1fcd15c 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/DurableConfigurationStoreTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/DurableConfigurationStoreTest.java @@ -49,6 +49,7 @@ import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.actors.TestLogActor; import org.apache.qpid.server.message.EnqueableMessage; import org.apache.qpid.server.model.UUIDGenerator; +import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.MockStoredMessage; import org.apache.qpid.server.store.ConfigurationRecoveryHandler.BindingRecoveryHandler; @@ -67,6 +68,7 @@ public class DurableConfigurationStoreTest extends QpidTestCase private String _storeName; private MessageStore _messageStore; private Configuration _configuration; + private VirtualHost _virtualHost; private ConfigurationRecoveryHandler _recoveryHandler; private QueueRecoveryHandler _queueRecoveryHandler; @@ -107,6 +109,7 @@ public class DurableConfigurationStoreTest extends QpidTestCase _messageStoreRecoveryHandler = mock(MessageStoreRecoveryHandler.class); _queueEntryRecoveryHandler = mock(TransactionLogRecoveryHandler.QueueEntryRecoveryHandler.class); _dtxRecordRecoveryHandler = mock(TransactionLogRecoveryHandler.DtxRecordRecoveryHandler.class); + _virtualHost = mock(VirtualHost.class); when(_messageStoreRecoveryHandler.begin()).thenReturn(_storedMessageRecoveryHandler); when(_recoveryHandler.begin(isA(MessageStore.class))).thenReturn(_exchangeRecoveryHandler); @@ -118,6 +121,7 @@ public class DurableConfigurationStoreTest extends QpidTestCase when(_exchange.getId()).thenReturn(_exchangeId); when(_configuration.getString(eq(MessageStoreConstants.ENVIRONMENT_PATH_PROPERTY), anyString())).thenReturn( _storePath); + when(_virtualHost.getAttribute(eq(VirtualHost.STORE_PATH))).thenReturn(_storePath); _bindingArgs = new FieldTable(); AMQShortString argKey = AMQPFilterTypes.JMS_SELECTOR.getValue(); @@ -314,8 +318,8 @@ public class DurableConfigurationStoreTest extends QpidTestCase _messageStore = createMessageStore(); _configStore = createConfigStore(); - _configStore.configureConfigStore(_storeName, _recoveryHandler, _configuration); - _messageStore.configureMessageStore(_storeName, _messageStoreRecoveryHandler, _logRecoveryHandler, _configuration); + _configStore.configureConfigStore(_storeName, _recoveryHandler, _virtualHost); + _messageStore.configureMessageStore(_storeName, _messageStoreRecoveryHandler, _logRecoveryHandler); _messageStore.activate(); } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java index 8743c4111b..f57195b2d7 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java @@ -26,7 +26,6 @@ import java.util.ArrayList; import java.util.List; import java.util.UUID; -import org.apache.commons.configuration.XMLConfiguration; import org.apache.log4j.Logger; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; @@ -37,9 +36,14 @@ import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl; import org.apache.qpid.server.message.EnqueableMessage; import org.apache.qpid.server.message.MessageMetaData; +import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.test.utils.QpidTestCase; import org.apache.qpid.util.FileUtils; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + public abstract class MessageStoreQuotaEventsTestBase extends QpidTestCase implements EventListener, TransactionLogResource { private static final Logger _logger = Logger.getLogger(MessageStoreQuotaEventsTestBase.class); @@ -54,7 +58,7 @@ public abstract class MessageStoreQuotaEventsTestBase extends QpidTestCase imple protected abstract MessageStore createStore() throws Exception; - protected abstract void applyStoreSpecificConfiguration(XMLConfiguration config); + protected abstract void applyStoreSpecificConfiguration(VirtualHost virtualHost); protected abstract int getNumberOfMessagesToFillStore(); @@ -66,12 +70,13 @@ public abstract class MessageStoreQuotaEventsTestBase extends QpidTestCase imple _storeLocation = new File(new File(TMP_FOLDER), getTestName()); FileUtils.delete(_storeLocation, true); - XMLConfiguration config = new XMLConfiguration(); - config.addProperty("environment-path", _storeLocation.getAbsolutePath()); - applyStoreSpecificConfiguration(config); + + VirtualHost vhost = mock(VirtualHost.class); + when(vhost.getAttribute(eq(VirtualHost.STORE_PATH))).thenReturn(_storeLocation.getAbsolutePath()); + applyStoreSpecificConfiguration(vhost); _store = createStore(); - ((DurableConfigurationStore)_store).configureConfigStore("test", null, config); + ((DurableConfigurationStore)_store).configureConfigStore("test", null, vhost); _transactionResource = UUID.randomUUID(); _events = new ArrayList<Event>(); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java index fb255e89f9..5eea002365 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java @@ -62,6 +62,10 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + /** * This tests the MessageStores by using the available interfaces. * @@ -98,26 +102,42 @@ public class MessageStoreTest extends QpidTestCase private PropertiesConfiguration _config; private VirtualHost _virtualHost; + private org.apache.qpid.server.model.VirtualHost _virtualHostModel; private Broker _broker; + private String _storePath; public void setUp() throws Exception { super.setUp(); BrokerTestHelper.setUp(); - String storePath = System.getProperty("QPID_WORK") + File.separator + getName(); + _storePath = System.getProperty("QPID_WORK") + File.separator + getName(); _config = new PropertiesConfiguration(); _config.addProperty("store.class", getTestProfileMessageStoreClassName()); - _config.addProperty("store.environment-path", storePath); + _config.addProperty("store.environment-path", _storePath); + _virtualHostModel = mock(org.apache.qpid.server.model.VirtualHost.class); + when(_virtualHostModel.getAttribute(eq(org.apache.qpid.server.model.VirtualHost.STORE_PATH))).thenReturn(_storePath); + + - cleanup(new File(storePath)); + cleanup(new File(_storePath)); _broker = BrokerTestHelper.createBrokerMock(); reloadVirtualHost(); } + protected String getStorePath() + { + return _storePath; + } + + protected org.apache.qpid.server.model.VirtualHost getVirtualHostModel() + { + return _virtualHostModel; + } + @Override public void tearDown() throws Exception { @@ -164,7 +184,7 @@ public class MessageStoreTest extends QpidTestCase try { - _virtualHost = BrokerTestHelper.createVirtualHost(new VirtualHostConfiguration(getClass().getName(), _config, _broker)); + _virtualHost = BrokerTestHelper.createVirtualHost(new VirtualHostConfiguration(getClass().getName(), _config, _broker),null,getVirtualHostModel()); } catch (Exception e) { diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java index 88d5852a17..a73057ebc1 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java @@ -25,8 +25,7 @@ import static org.mockito.Matchers.isA; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import org.apache.commons.configuration.Configuration; -import org.apache.commons.configuration.PropertiesConfiguration; +import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.store.ConfigurationRecoveryHandler.BindingRecoveryHandler; import org.apache.qpid.server.store.ConfigurationRecoveryHandler.ExchangeRecoveryHandler; import org.apache.qpid.server.store.ConfigurationRecoveryHandler.QueueRecoveryHandler; @@ -44,9 +43,9 @@ public abstract class MessageStoreTestCase extends QpidTestCase private TransactionLogRecoveryHandler _logRecoveryHandler; private TransactionLogRecoveryHandler.QueueEntryRecoveryHandler _queueEntryRecoveryHandler; private TransactionLogRecoveryHandler.DtxRecordRecoveryHandler _dtxRecordRecoveryHandler; + private VirtualHost _virtualHost; private MessageStore _store; - private Configuration _storeConfiguration; public void setUp() throws Exception { @@ -61,6 +60,7 @@ public abstract class MessageStoreTestCase extends QpidTestCase _messageStoreRecoveryHandler = mock(MessageStoreRecoveryHandler.class); _queueEntryRecoveryHandler = mock(TransactionLogRecoveryHandler.QueueEntryRecoveryHandler.class); _dtxRecordRecoveryHandler = mock(TransactionLogRecoveryHandler.DtxRecordRecoveryHandler.class); + _virtualHost = mock(VirtualHost.class); when(_messageStoreRecoveryHandler.begin()).thenReturn(_storedMessageRecoveryHandler); when(_recoveryHandler.begin(isA(MessageStore.class))).thenReturn(_exchangeRecoveryHandler); @@ -69,15 +69,15 @@ public abstract class MessageStoreTestCase extends QpidTestCase when(_logRecoveryHandler.begin(any(MessageStore.class))).thenReturn(_queueEntryRecoveryHandler); when(_queueEntryRecoveryHandler.completeQueueEntryRecovery()).thenReturn(_dtxRecordRecoveryHandler); - _storeConfiguration = new PropertiesConfiguration(); - setUpStoreConfiguration(_storeConfiguration); + setUpStoreConfiguration(_virtualHost); _store = createMessageStore(); - ((DurableConfigurationStore)_store).configureConfigStore(getTestName(), _recoveryHandler, _storeConfiguration); - _store.configureMessageStore(getTestName(), _messageStoreRecoveryHandler, _logRecoveryHandler, _storeConfiguration); + ((DurableConfigurationStore)_store).configureConfigStore(getTestName(), _recoveryHandler, _virtualHost); + + _store.configureMessageStore(getTestName(), _messageStoreRecoveryHandler, _logRecoveryHandler); } - protected abstract void setUpStoreConfiguration(Configuration storeConfiguration) throws Exception; + protected abstract void setUpStoreConfiguration(VirtualHost virtualHost) throws Exception; protected abstract MessageStore createMessageStore(); @@ -86,8 +86,4 @@ public abstract class MessageStoreTestCase extends QpidTestCase return _store; } - public Configuration getStoreConfiguration() - { - return _storeConfiguration; - } } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreQuotaEventsTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreQuotaEventsTest.java index 5d316fca43..479675dac1 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreQuotaEventsTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreQuotaEventsTest.java @@ -20,11 +20,15 @@ */ package org.apache.qpid.server.store.derby; -import org.apache.commons.configuration.XMLConfiguration; import org.apache.log4j.Logger; +import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.MessageStoreConstants; import org.apache.qpid.server.store.MessageStoreQuotaEventsTestBase; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.when; + public class DerbyMessageStoreQuotaEventsTest extends MessageStoreQuotaEventsTestBase { private static final Logger _logger = Logger.getLogger(DerbyMessageStoreQuotaEventsTest.class); @@ -46,12 +50,12 @@ public class DerbyMessageStoreQuotaEventsTest extends MessageStoreQuotaEventsTes } @Override - protected void applyStoreSpecificConfiguration(XMLConfiguration config) + protected void applyStoreSpecificConfiguration(VirtualHost vhost) { _logger.debug("Applying store specific config. overfull-sze=" + OVERFULL_SIZE + ", underfull-size=" + UNDERFULL_SIZE); - config.addProperty("overfull-size", OVERFULL_SIZE); - config.addProperty("underfull-size", UNDERFULL_SIZE); + when(vhost.getAttribute(eq(MessageStoreConstants.OVERFULL_SIZE_ATTRIBUTE))).thenReturn(OVERFULL_SIZE); + when(vhost.getAttribute(eq(MessageStoreConstants.UNDERFULL_SIZE_ATTRIBUTE))).thenReturn(UNDERFULL_SIZE); } @Override diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreTest.java index 1747588bf1..859fad629b 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreTest.java @@ -22,11 +22,14 @@ package org.apache.qpid.server.store.derby; import java.io.File; -import org.apache.commons.configuration.Configuration; +import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.MessageStoreTestCase; import org.apache.qpid.util.FileUtils; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.when; + public class DerbyMessageStoreTest extends MessageStoreTestCase { private String _storeLocation; @@ -57,10 +60,10 @@ public class DerbyMessageStoreTest extends MessageStoreTestCase } @Override - protected void setUpStoreConfiguration(Configuration storeConfiguration) throws Exception + protected void setUpStoreConfiguration(VirtualHost virtualHost) throws Exception { _storeLocation = TMP_FOLDER + File.separator + getTestName(); - storeConfiguration.setProperty("environment-path", _storeLocation); + when(virtualHost.getAttribute(eq(VirtualHost.STORE_PATH))).thenReturn(_storeLocation); deleteStoreIfExists(); } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java index bb118eaaf7..a8e0460cea 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java @@ -28,11 +28,14 @@ import java.sql.SQLException; import java.util.HashSet; import java.util.Set; -import org.apache.commons.configuration.Configuration; +import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.MessageStoreTestCase; import org.apache.qpid.server.store.derby.DerbyMessageStore; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.when; + public class JDBCMessageStoreTest extends MessageStoreTestCase { private String _connectionURL; @@ -61,10 +64,11 @@ public class JDBCMessageStoreTest extends MessageStoreTestCase } @Override - protected void setUpStoreConfiguration(Configuration storeConfiguration) throws Exception + protected void setUpStoreConfiguration(VirtualHost virtualHost) throws Exception { _connectionURL = "jdbc:derby:memory:/" + getTestName() + ";create=true"; - storeConfiguration.addProperty("connectionUrl", _connectionURL); + + when(virtualHost.getAttribute(eq("connectionURL"))).thenReturn(_connectionURL); } @Override diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java index 1c8939d117..7811d04997 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java @@ -58,7 +58,6 @@ import org.apache.qpid.server.store.TestableMemoryMessageStore; import org.apache.qpid.server.virtualhost.StandardVirtualHostFactory; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.plugin.VirtualHostFactory; -import org.apache.qpid.server.virtualhost.VirtualHostFactoryRegistry; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; public class BrokerTestHelper @@ -98,6 +97,12 @@ public class BrokerTestHelper public static VirtualHost createVirtualHost(VirtualHostConfiguration virtualHostConfiguration, VirtualHostRegistry virtualHostRegistry) throws Exception { + return createVirtualHost(virtualHostConfiguration, virtualHostRegistry, mock(org.apache.qpid.server.model.VirtualHost.class)); + } + + public static VirtualHost createVirtualHost(VirtualHostConfiguration virtualHostConfiguration, VirtualHostRegistry virtualHostRegistry, org.apache.qpid.server.model.VirtualHost modelVHost) + throws Exception + { StatisticsGatherer statisticsGatherer = mock(StatisticsGatherer.class); final VirtualHostFactory factory = virtualHostConfiguration == null ? new StandardVirtualHostFactory() @@ -105,18 +110,18 @@ public class BrokerTestHelper VirtualHost host = factory.createVirtualHost(virtualHostRegistry, statisticsGatherer, new SecurityManager(mock(Broker.class), false), - virtualHostConfiguration); - virtualHostRegistry.registerVirtualHost(host); + virtualHostConfiguration, + modelVHost); + if(virtualHostRegistry != null) + { + virtualHostRegistry.registerVirtualHost(host); + } return host; } public static VirtualHost createVirtualHost(VirtualHostConfiguration virtualHostConfiguration) throws Exception { - final VirtualHostFactory factory = - virtualHostConfiguration == null ? new StandardVirtualHostFactory() - : VirtualHostFactory.FACTORIES.get(virtualHostConfiguration.getType()); - - return factory.createVirtualHost(null, mock(StatisticsGatherer.class), new SecurityManager(mock(Broker.class), false), virtualHostConfiguration); + return createVirtualHost(virtualHostConfiguration, null); } public static VirtualHost createVirtualHost(String name, VirtualHostRegistry virtualHostRegistry) throws Exception diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/StandardVirtualHostTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/StandardVirtualHostTest.java index 1243d9f7dd..ae09e8d7e7 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/StandardVirtualHostTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/StandardVirtualHostTest.java @@ -264,7 +264,8 @@ public class StandardVirtualHostTest extends QpidTestCase _virtualHostRegistry = broker.getVirtualHostRegistry(); VirtualHostConfiguration configuration = new VirtualHostConfiguration(vhostName, config, broker); - VirtualHost host = new StandardVirtualHostFactory().createVirtualHost(_virtualHostRegistry, mock(StatisticsGatherer.class), new SecurityManager(mock(Broker.class), false), configuration); + VirtualHost host = new StandardVirtualHostFactory().createVirtualHost(_virtualHostRegistry, mock(StatisticsGatherer.class), new SecurityManager(mock(Broker.class), false), configuration, + mock(org.apache.qpid.server.model.VirtualHost.class)); _virtualHostRegistry.registerVirtualHost(host); return host; @@ -364,7 +365,8 @@ public class StandardVirtualHostTest extends QpidTestCase Configuration config = new PropertiesConfiguration(); config.setProperty("store.type", MemoryMessageStore.TYPE); VirtualHostConfiguration configuration = new VirtualHostConfiguration(virtualHostName, config, broker); - VirtualHost host = new StandardVirtualHostFactory().createVirtualHost(_virtualHostRegistry, mock(StatisticsGatherer.class), new SecurityManager(mock(Broker.class), false), configuration); + VirtualHost host = new StandardVirtualHostFactory().createVirtualHost(_virtualHostRegistry, mock(StatisticsGatherer.class), new SecurityManager(mock(Broker.class), false), configuration, + mock(org.apache.qpid.server.model.VirtualHost.class)); _virtualHostRegistry.registerVirtualHost(host); return host; } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java index 07965cfa95..69efb7e310 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java @@ -23,10 +23,10 @@ package org.apache.qpid.server.store; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; -import org.apache.commons.configuration.Configuration; import org.apache.qpid.AMQStoreException; import org.apache.qpid.server.message.EnqueableMessage; import org.apache.qpid.server.message.MessageContentSource; +import org.apache.qpid.server.model.VirtualHost; public class QuotaMessageStore extends NullMessageStore { @@ -47,12 +47,27 @@ public class QuotaMessageStore extends NullMessageStore } @Override - public void configureConfigStore(String name, ConfigurationRecoveryHandler recoveryHandler, Configuration config) + public void configureConfigStore(String name, + ConfigurationRecoveryHandler recoveryHandler, + VirtualHost virtualHost) throws Exception { - _persistentSizeHighThreshold = config.getLong(MessageStoreConstants.OVERFULL_SIZE_PROPERTY, Long.MAX_VALUE); - _persistentSizeLowThreshold = config.getLong(MessageStoreConstants.UNDERFULL_SIZE_PROPERTY, - _persistentSizeHighThreshold); + Object overfullAttr = virtualHost.getAttribute(MessageStoreConstants.OVERFULL_SIZE_ATTRIBUTE); + _persistentSizeHighThreshold = overfullAttr == null + ? Long.MAX_VALUE + : overfullAttr instanceof Number + ? ((Number)overfullAttr).longValue() + : Long.parseLong(overfullAttr.toString()); + + Object underfullAttr = virtualHost.getAttribute(MessageStoreConstants.UNDERFULL_SIZE_ATTRIBUTE); + + _persistentSizeLowThreshold = overfullAttr == null + ? _persistentSizeHighThreshold + : underfullAttr instanceof Number + ? ((Number)underfullAttr).longValue() + : Long.parseLong(underfullAttr.toString()); + + if (_persistentSizeLowThreshold > _persistentSizeHighThreshold || _persistentSizeLowThreshold < 0l) { _persistentSizeLowThreshold = _persistentSizeHighThreshold; @@ -62,7 +77,7 @@ public class QuotaMessageStore extends NullMessageStore @Override public void configureMessageStore(String name, MessageStoreRecoveryHandler recoveryHandler, - TransactionLogRecoveryHandler tlogRecoveryHandler, Configuration config) throws Exception + TransactionLogRecoveryHandler tlogRecoveryHandler) throws Exception { _stateManager.attainState(State.INITIALISED); } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java index ed76c40717..76250e126a 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java @@ -20,7 +20,8 @@ */ package org.apache.qpid.server.store; -import org.apache.commons.configuration.Configuration; +import java.util.Collections; +import java.util.Map; import org.apache.log4j.Logger; import org.apache.qpid.AMQStoreException; @@ -29,11 +30,11 @@ import org.apache.qpid.server.binding.Binding; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.message.EnqueableMessage; import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.queue.AMQQueue; import java.nio.ByteBuffer; import java.util.HashMap; -import java.util.Iterator; public class SlowMessageStore implements MessageStore, DurableConfigurationStore { @@ -51,19 +52,22 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore // ***** MessageStore Interface. public void configureConfigStore(String name, - ConfigurationRecoveryHandler recoveryHandler, - Configuration config) throws Exception + ConfigurationRecoveryHandler recoveryHandler, + VirtualHost virtualHost) throws Exception { _logger.info("Starting SlowMessageStore on Virtualhost:" + name); - Configuration delays = config.subset(DELAYS); + Object delaysAttr = virtualHost.getAttribute("slowMessageStoreDelays"); + + Map delays = (delaysAttr instanceof Map) ? (Map) delaysAttr : Collections.emptyMap(); configureDelays(delays); - String messageStoreClass = config.getString("realStore"); + final Object realStoreAttr = virtualHost.getAttribute("realStore"); + String messageStoreClass = realStoreAttr == null ? null : realStoreAttr.toString(); if (delays.containsKey(DEFAULT_DELAY)) { - _defaultDelay = delays.getLong(DEFAULT_DELAY); + _defaultDelay = Long.parseLong(String.valueOf(delays.get(DEFAULT_DELAY))); } if (messageStoreClass != null) @@ -83,25 +87,23 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore _durableConfigurationStore = (DurableConfigurationStore)o; } } - _durableConfigurationStore.configureConfigStore(name, recoveryHandler, config); + _durableConfigurationStore.configureConfigStore(name, recoveryHandler, virtualHost); } - private void configureDelays(Configuration config) + private void configureDelays(Map<Object, Object> config) { - @SuppressWarnings("unchecked") - Iterator<String> delays = config.getKeys(); - while (delays.hasNext()) + for(Map.Entry<Object, Object> entry : config.entrySet()) { - String key = (String) delays.next(); - if (key.endsWith(PRE)) + String key = String.valueOf(entry.getKey()); + if (key.startsWith(PRE)) { - _preDelays.put(key.substring(0, key.length() - PRE.length() - 1), config.getLong(key)); + _preDelays.put(key.substring(PRE.length()), Long.parseLong(String.valueOf(entry.getValue()))); } - else if (key.endsWith(POST)) + else if (key.startsWith(POST)) { - _postDelays.put(key.substring(0, key.length() - POST.length() - 1), config.getLong(key)); + _postDelays.put(key.substring(POST.length()), Long.parseLong(String.valueOf(entry.getValue()))); } } } @@ -156,10 +158,9 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore public void configureMessageStore(String name, MessageStoreRecoveryHandler messageRecoveryHandler, - TransactionLogRecoveryHandler tlogRecoveryHandler, - Configuration config) throws Exception + TransactionLogRecoveryHandler tlogRecoveryHandler) throws Exception { - _realStore.configureMessageStore(name, messageRecoveryHandler, tlogRecoveryHandler, config); + _realStore.configureMessageStore(name, messageRecoveryHandler, tlogRecoveryHandler); } public void close() throws Exception diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStoreFactory.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStoreFactory.java new file mode 100644 index 0000000000..a798e6d50e --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStoreFactory.java @@ -0,0 +1,78 @@ +package org.apache.qpid.server.store;/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import org.apache.commons.configuration.Configuration; +import org.apache.qpid.server.plugin.MessageStoreFactory; + +public class SlowMessageStoreFactory implements MessageStoreFactory +{ + @Override + public String getType() + { + return "SLOW"; + } + + @Override + public MessageStore createMessageStore() + { + return new SlowMessageStore(); + } + + @Override + public Map<String, Object> convertStoreConfiguration(Configuration storeConfiguration) + { + Map<String, Object> convertedMap = new HashMap<String, Object>(); + Configuration delaysConfig = storeConfiguration.subset("delays"); + + @SuppressWarnings("unchecked") + Iterator<String> delays = delaysConfig.getKeys(); + + Map<String,Long> delaysMap = new HashMap<String, Long>(); + + while (delays.hasNext()) + { + String key = delays.next(); + + if (key.endsWith("pre")) + { + delaysMap.put("pre"+key.substring(0, key.length() - 4), delaysConfig.getLong(key)); + } + else if (key.endsWith("post")) + { + delaysMap.put("post"+key.substring(0, key.length() - 5), delaysConfig.getLong(key)); + } + } + + if(!delaysMap.isEmpty()) + { + convertedMap.put("slowMessageStoreDelays",delaysMap); + } + + + convertedMap.put("realStore", storeConfiguration.getString("realStore", null)); + + + return convertedMap; + } +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitDelayTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitDelayTest.java index 4a81480671..182cd5ff0c 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitDelayTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitDelayTest.java @@ -55,10 +55,10 @@ public class SyncWaitDelayTest extends QpidBrokerTestCase public void setUp() throws Exception { - setVirtualHostConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST+".type", - StandardVirtualHostFactory.TYPE); - setVirtualHostConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST+".store.class", "org.apache.qpid.server.store.SlowMessageStore"); - setVirtualHostConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST+".store.delays.commitTran.post", String.valueOf(POST_COMMIT_DELAY)); + final String prefix = "virtualhosts.virtualhost." + VIRTUALHOST; + setVirtualHostConfigurationProperty(prefix + ".type", StandardVirtualHostFactory.TYPE); + setVirtualHostConfigurationProperty(prefix + ".store.class", "org.apache.qpid.server.store.SlowMessageStore"); + setVirtualHostConfigurationProperty(prefix + ".store.delays.commitTran.post", String.valueOf(POST_COMMIT_DELAY)); super.setUp(); diff --git a/qpid/java/systests/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageStoreFactory b/qpid/java/systests/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageStoreFactory new file mode 100644 index 0000000000..fdd7a904c3 --- /dev/null +++ b/qpid/java/systests/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageStoreFactory @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +org.apache.qpid.server.store.SlowMessageStoreFactory |