summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2014-03-14 16:39:47 +0000
committerKeith Wall <kwall@apache.org>2014-03-14 16:39:47 +0000
commitec486999608568e37a55dc9c81d9be133d95ebc3 (patch)
tree87d6446e97cfdca321b1faff6f24a3010df4cdff
parentdb26915f9b2edfa410c094162bec78b9d2010b24 (diff)
downloadqpid-python-ec486999608568e37a55dc9c81d9be133d95ebc3.tar.gz
QPID-5624: Introduce messageStoreSettings VH attribute and move all message store related attributes into messageStoreSettings map
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-bdb-ha2@1577606 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanProvider.java4
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostFactory.java19
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java50
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java8
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeFactory.java3
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java10
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java41
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreQuotaEventsTest.java27
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/VirtualHostTest.java38
-rw-r--r--qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBBackupTest.java4
-rw-r--r--qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java14
-rw-r--r--qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java2
-rw-r--r--qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java39
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/startup/StoreUpgrader.java106
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/MemoryConfigurationEntryStore.java5
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Model.java5
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java32
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java47
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/JDBCConnectionProviderFactory.java3
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java5
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStoreConstants.java31
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/util/MapValueConverter.java78
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java11
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHostFactory.java18
-rw-r--r--qpid/java/broker-core/src/main/resources/initial-config.json8
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/StoreUpgraderTest.java217
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/VirtualHostRecovererTest.java17
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java5
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java5
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java18
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/StateManagerTest.java13
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java7
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/StandardVirtualHostTest.java10
-rw-r--r--qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java25
-rw-r--r--qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStoreFactory.java9
-rw-r--r--qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreQuotaEventsTest.java33
-rw-r--r--qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreTest.java11
-rw-r--r--qpid/java/broker-plugins/jdbc-provider-bone/src/main/java/org/apache/qpid/server/store/jdbc/bonecp/BoneCPConnectionProvider.java32
-rw-r--r--qpid/java/broker-plugins/jdbc-provider-bone/src/main/java/org/apache/qpid/server/store/jdbc/bonecp/BoneCPConnectionProviderFactory.java5
-rw-r--r--qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/DefaultConnectionProviderFactory.java2
-rw-r--r--qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java43
-rw-r--r--qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreFactory.java15
-rw-r--r--qpid/java/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java9
-rw-r--r--qpid/java/systests/etc/config-systests.json8
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java7
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java6
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java80
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/store/StoreOverfullTest.java11
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/Asserts.java19
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/RestTestHelper.java2
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java32
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/acl/BrokerACLTest.java8
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitDelayTest.java24
-rwxr-xr-xqpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java24
54 files changed, 939 insertions, 366 deletions
diff --git a/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanProvider.java b/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanProvider.java
index 16199d30a3..24d7513c5f 100644
--- a/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanProvider.java
+++ b/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanProvider.java
@@ -28,6 +28,7 @@ import org.apache.qpid.server.jmx.MBeanProvider;
import org.apache.qpid.server.jmx.ManagedObject;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.store.berkeleydb.BDBHAVirtualHostFactory;
import org.apache.qpid.server.store.berkeleydb.BDBMessageStore;
import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade;
@@ -48,8 +49,7 @@ public class BDBHAMessageStoreManagerMBeanProvider implements MBeanProvider
@Override
public boolean isChildManageableByMBean(ConfiguredObject child)
{
- return (child instanceof VirtualHost
- && ReplicatedEnvironmentFacade.TYPE.equals(child.getAttribute(VirtualHost.STORE_TYPE)));
+ return (child instanceof VirtualHost && BDBHAVirtualHostFactory.TYPE.equals(child.getType()));
}
@Override
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostFactory.java
index b2ec96f9f8..6fb84b8a4d 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostFactory.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostFactory.java
@@ -23,6 +23,8 @@ import java.util.Map;
import org.apache.qpid.server.plugin.VirtualHostFactory;
import org.apache.qpid.server.stats.StatisticsGatherer;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacadeFactory;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
@@ -52,11 +54,18 @@ public class BDBHAVirtualHostFactory implements VirtualHostFactory
@Override
public void validateAttributes(Map<String, Object> attributes)
{
- validateAttribute(org.apache.qpid.server.model.VirtualHost.STORE_PATH, String.class, attributes);
- validateAttribute("haGroupName", String.class, attributes);
- validateAttribute("haNodeName", String.class, attributes);
- validateAttribute("haNodeAddress", String.class, attributes);
- validateAttribute("haHelperAddress", String.class, attributes);
+ @SuppressWarnings("unchecked")
+ Map<String, Object> messageStoreSettings = (Map<String, Object>)attributes.get(org.apache.qpid.server.model.VirtualHost.MESSAGE_STORE_SETTINGS);
+ if (messageStoreSettings == null)
+ {
+ throw new IllegalArgumentException("Attribute '"+ org.apache.qpid.server.model.VirtualHost.MESSAGE_STORE_SETTINGS + "' is required.");
+ }
+
+ validateAttribute(MessageStore.STORE_PATH, String.class, messageStoreSettings);
+ validateAttribute(ReplicatedEnvironmentFacadeFactory.GROUP_NAME, String.class, messageStoreSettings);
+ validateAttribute(ReplicatedEnvironmentFacadeFactory.NODE_NAME, String.class, messageStoreSettings);
+ validateAttribute(ReplicatedEnvironmentFacadeFactory.NODE_ADDRESS, String.class, messageStoreSettings);
+ validateAttribute(ReplicatedEnvironmentFacadeFactory.HELPER_ADDRESS, String.class, messageStoreSettings);
}
private void validateAttribute(String attrName, Class<?> clazz, Map<String, Object> attributes)
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
index 35dae4b800..c8550b2114 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
@@ -20,12 +20,6 @@
*/
package org.apache.qpid.server.store.berkeleydb;
-import com.sleepycat.bind.tuple.ByteBinding;
-import com.sleepycat.bind.tuple.IntegerBinding;
-import com.sleepycat.bind.tuple.LongBinding;
-import com.sleepycat.je.*;
-import com.sleepycat.je.Transaction;
-
import java.io.File;
import java.lang.ref.SoftReference;
import java.nio.ByteBuffer;
@@ -36,16 +30,32 @@ import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.log4j.Logger;
import org.apache.qpid.server.message.EnqueueableMessage;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.store.*;
+import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
+import org.apache.qpid.server.store.ConfiguredObjectRecord;
+import org.apache.qpid.server.store.DurableConfigurationStore;
+import org.apache.qpid.server.store.Event;
+import org.apache.qpid.server.store.EventListener;
+import org.apache.qpid.server.store.EventManager;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.MessageStoreRecoveryHandler;
import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler;
+import org.apache.qpid.server.store.State;
+import org.apache.qpid.server.store.StateManager;
+import org.apache.qpid.server.store.StorableMessageMetaData;
+import org.apache.qpid.server.store.StoreException;
+import org.apache.qpid.server.store.StoreFuture;
+import org.apache.qpid.server.store.StoredMemoryMessage;
+import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.store.TransactionLogRecoveryHandler;
import org.apache.qpid.server.store.TransactionLogRecoveryHandler.QueueEntryRecoveryHandler;
+import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.store.berkeleydb.entry.PreparedTransaction;
import org.apache.qpid.server.store.berkeleydb.entry.QueueEntryKey;
import org.apache.qpid.server.store.berkeleydb.entry.Xid;
@@ -59,6 +69,21 @@ import org.apache.qpid.server.store.berkeleydb.tuple.XidBinding;
import org.apache.qpid.server.store.berkeleydb.upgrade.Upgrader;
import org.apache.qpid.util.FileUtils;
+import com.sleepycat.bind.tuple.ByteBinding;
+import com.sleepycat.bind.tuple.IntegerBinding;
+import com.sleepycat.bind.tuple.LongBinding;
+import com.sleepycat.je.CheckpointConfig;
+import com.sleepycat.je.Cursor;
+import com.sleepycat.je.Database;
+import com.sleepycat.je.DatabaseConfig;
+import com.sleepycat.je.DatabaseEntry;
+import com.sleepycat.je.DatabaseException;
+import com.sleepycat.je.EnvironmentConfig;
+import com.sleepycat.je.LockConflictException;
+import com.sleepycat.je.LockMode;
+import com.sleepycat.je.OperationStatus;
+import com.sleepycat.je.Transaction;
+
/**
* BDBMessageStore implements a persistent {@link MessageStore} using the BDB high performance log.
*
@@ -72,8 +97,6 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
private static final Logger LOGGER = Logger.getLogger(BDBMessageStore.class);
public static final int VERSION = 7;
- public static final String ENVIRONMENT_CONFIGURATION = "bdbEnvironmentConfig";
-
private static final int LOCK_RETRY_ATTEMPTS = 5;
private static String CONFIGURED_OBJECTS_DB_NAME = "CONFIGURED_OBJECTS";
private static String MESSAGE_META_DATA_DB_NAME = "MESSAGE_METADATA";
@@ -119,7 +142,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
public BDBMessageStore(EnvironmentFacadeFactory environmentFacadeFactory)
{
- _type = environmentFacadeFactory.getType();;
+ _type = environmentFacadeFactory.getType();
_environmentFacadeFactory = environmentFacadeFactory;
_stateManager = new StateManager(_eventManager);
}
@@ -218,8 +241,9 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
private void configure(VirtualHost virtualHost, boolean isMessageStore) throws StoreException
{
- Object overfullAttr = virtualHost.getAttribute(MessageStoreConstants.OVERFULL_SIZE_ATTRIBUTE);
- Object underfullAttr = virtualHost.getAttribute(MessageStoreConstants.UNDERFULL_SIZE_ATTRIBUTE);
+ Map<String, Object> messageStoreSettings = virtualHost.getMessageStoreSettings();
+ Object overfullAttr = messageStoreSettings.get(MessageStore.OVERFULL_SIZE);
+ Object underfullAttr = messageStoreSettings.get(MessageStore.UNDERFULL_SIZE);
_persistentSizeHighThreshold = overfullAttr == null ? -1l :
overfullAttr instanceof Number ? ((Number) overfullAttr).longValue() : Long.parseLong(overfullAttr.toString());
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java
index 8f2086a25c..e2b30f6740 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java
@@ -52,12 +52,14 @@ public class BDBMessageStoreFactory implements MessageStoreFactory, DurableConfi
@Override
public void validateAttributes(Map<String, Object> attributes)
{
- if(getType().equals(attributes.get(VirtualHost.STORE_TYPE)))
+ @SuppressWarnings("unchecked")
+ Map<String, Object> messageStoreSettings = (Map<String, Object>) attributes.get(VirtualHost.MESSAGE_STORE_SETTINGS);
+ if(getType().equals(messageStoreSettings.get(MessageStore.STORE_TYPE)))
{
- Object storePath = attributes.get(VirtualHost.STORE_PATH);
+ Object storePath = messageStoreSettings.get(MessageStore.STORE_PATH);
if(!(storePath instanceof String))
{
- throw new IllegalArgumentException("Attribute '"+ VirtualHost.STORE_PATH
+ throw new IllegalArgumentException("Setting '"+ MessageStore.STORE_PATH
+"' is required and must be of type String.");
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeFactory.java
index b784e436b9..d242790efb 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeFactory.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeFactory.java
@@ -24,8 +24,9 @@ import org.apache.qpid.server.model.VirtualHost;
public interface EnvironmentFacadeFactory
{
+ public static final String ENVIRONMENT_CONFIGURATION = "bdbEnvironmentConfig";
- EnvironmentFacade createEnvironmentFacade(VirtualHost virtualHost, boolean isMessageStore);
+ EnvironmentFacade createEnvironmentFacade(VirtualHost<?> virtualHost, boolean isMessageStore);
String getType();
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java
index 384ceba98a..7fdae6b3ee 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java
@@ -26,30 +26,32 @@ import java.util.Map;
import org.apache.qpid.server.configuration.BrokerProperties;
import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.store.MessageStore;
public class StandardEnvironmentFacadeFactory implements EnvironmentFacadeFactory
{
@SuppressWarnings("unchecked")
@Override
- public EnvironmentFacade createEnvironmentFacade(VirtualHost virtualHost, boolean isMessageStore)
+ public EnvironmentFacade createEnvironmentFacade(VirtualHost<?> virtualHost, boolean isMessageStore)
{
+ String name = virtualHost.getName();
+ Map<String, Object> messageStoreSettings = virtualHost.getMessageStoreSettings();
Map<String, String> envConfigMap = new HashMap<String, String>();
envConfigMap.putAll(EnvironmentFacade.ENVCONFIG_DEFAULTS);
- Object environmentConfigurationAttributes = virtualHost.getAttribute(BDBMessageStore.ENVIRONMENT_CONFIGURATION);
+ Object environmentConfigurationAttributes = messageStoreSettings.get(ENVIRONMENT_CONFIGURATION);
if (environmentConfigurationAttributes instanceof Map)
{
envConfigMap.putAll((Map<String, String>) environmentConfigurationAttributes);
}
- String name = virtualHost.getName();
final String defaultPath = System.getProperty(BrokerProperties.PROPERTY_QPID_WORK) + File.separator + "bdbstore" + File.separator + name;
String storeLocation;
if(isMessageStore)
{
- storeLocation = (String) virtualHost.getAttribute(VirtualHost.STORE_PATH);
+ storeLocation = (String) messageStoreSettings.get(MessageStore.STORE_PATH);
if(storeLocation == null)
{
storeLocation = defaultPath;
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java
index cd53afe891..4df62b1d0f 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java
@@ -23,6 +23,7 @@ package org.apache.qpid.server.store.berkeleydb.replication;
import java.util.Map;
import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.berkeleydb.EnvironmentFacade;
import org.apache.qpid.server.store.berkeleydb.EnvironmentFacadeFactory;
@@ -32,47 +33,56 @@ import com.sleepycat.je.Durability.SyncPolicy;
public class ReplicatedEnvironmentFacadeFactory implements EnvironmentFacadeFactory
{
-
+ public static final String DURABILITY = "haDurability";
+ public static final String GROUP_NAME = "haGroupName";
+ public static final String HELPER_ADDRESS = "haHelperAddress";
+ public static final String NODE_ADDRESS = "haNodeAddress";
+ public static final String NODE_NAME = "haNodeName";
+ public static final String REPLICATION_CONFIG = "haReplicationConfig";
+ public static final String COALESCING_SYNC = "haCoalescingSync";
+ public static final String DESIGNATED_PRIMARY = "haDesignatedPrimary";
+
private static final int DEFAULT_NODE_PRIORITY = 1;
private static final Durability DEFAULT_DURABILITY = new Durability(SyncPolicy.NO_SYNC, SyncPolicy.NO_SYNC,
ReplicaAckPolicy.SIMPLE_MAJORITY);
private static final boolean DEFAULT_COALESCING_SYNC = true;
-
-
@Override
- public EnvironmentFacade createEnvironmentFacade(final VirtualHost virtualHost, boolean isMessageStore)
+ public EnvironmentFacade createEnvironmentFacade(VirtualHost<?> virtualHost, boolean isMessageStore)
{
+ final Map<String, Object> messageStoreSettings = virtualHost.getMessageStoreSettings();
ReplicatedEnvironmentConfiguration configuration = new ReplicatedEnvironmentConfiguration()
{
@Override
public boolean isDesignatedPrimary()
{
- return convertBoolean(virtualHost.getAttribute("haDesignatedPrimary"), false);
+ return convertBoolean(messageStoreSettings.get(DESIGNATED_PRIMARY), false);
}
@Override
public boolean isCoalescingSync()
{
- return convertBoolean(virtualHost.getAttribute("haCoalescingSync"), DEFAULT_COALESCING_SYNC);
+ return convertBoolean(messageStoreSettings.get(COALESCING_SYNC), DEFAULT_COALESCING_SYNC);
}
@Override
public String getStorePath()
{
- return (String) virtualHost.getAttribute(VirtualHost.STORE_PATH);
+ return (String) messageStoreSettings.get(MessageStore.STORE_PATH);
}
+ @SuppressWarnings("unchecked")
@Override
public Map<String, String> getParameters()
{
- return (Map<String, String>) virtualHost.getAttribute("bdbEnvironmentConfig");
+ return (Map<String, String>) messageStoreSettings.get(EnvironmentFacadeFactory.ENVIRONMENT_CONFIGURATION);
}
+ @SuppressWarnings("unchecked")
@Override
public Map<String, String> getReplicationParameters()
{
- return (Map<String, String>) virtualHost.getAttribute("haReplicationConfig");
+ return (Map<String, String>) messageStoreSettings.get(REPLICATION_CONFIG);
}
@Override
@@ -87,36 +97,35 @@ public class ReplicatedEnvironmentFacadeFactory implements EnvironmentFacadeFact
return DEFAULT_NODE_PRIORITY;
}
-
-
@Override
public String getName()
{
- return (String)virtualHost.getAttribute("haNodeName");
+ return (String)messageStoreSettings.get(NODE_NAME);
}
@Override
public String getHostPort()
{
- return (String)virtualHost.getAttribute("haNodeAddress");
+ return (String)messageStoreSettings.get(NODE_ADDRESS);
}
@Override
public String getHelperHostPort()
{
- return (String)virtualHost.getAttribute("haHelperAddress");
+ return (String)messageStoreSettings.get(HELPER_ADDRESS);
}
@Override
public String getGroupName()
{
- return (String)virtualHost.getAttribute("haGroupName");
+ return (String)messageStoreSettings.get(GROUP_NAME);
}
@Override
public String getDurability()
{
- return virtualHost.getAttribute("haDurability") == null ? DEFAULT_DURABILITY.toString() : (String)virtualHost.getAttribute("haDurability");
+ String durability = (String)messageStoreSettings.get(DURABILITY);
+ return durability == null ? DEFAULT_DURABILITY.toString() : durability;
}
};
return new ReplicatedEnvironmentFacade(configuration);
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreQuotaEventsTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreQuotaEventsTest.java
index 4684358190..65830fd1c2 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreQuotaEventsTest.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreQuotaEventsTest.java
@@ -20,17 +20,18 @@
*/
package org.apache.qpid.server.store.berkeleydb;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
import java.util.Collections;
+import java.util.HashMap;
import java.util.Map;
+
import org.apache.log4j.Logger;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.MessageStoreConstants;
import org.apache.qpid.server.store.MessageStoreQuotaEventsTestBase;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.when;
-
public class BDBMessageStoreQuotaEventsTest extends MessageStoreQuotaEventsTestBase
{
private static final Logger _logger = Logger.getLogger(BDBMessageStoreQuotaEventsTest.class);
@@ -59,16 +60,22 @@ public class BDBMessageStoreQuotaEventsTest extends MessageStoreQuotaEventsTestB
return NUMBER_OF_MESSAGES_TO_OVERFILL_STORE;
}
+
@Override
- protected void applyStoreSpecificConfiguration(VirtualHost virtualHost)
+ protected VirtualHost<?> createVirtualHost(String storeLocation)
{
- _logger.debug("Applying store specific config. overfull-sze=" + OVERFULL_SIZE + ", underfull-size=" + UNDERFULL_SIZE);
+ _logger.debug("Applying store specific config. overfull-size=" + OVERFULL_SIZE + ", underfull-size=" + UNDERFULL_SIZE);
+ VirtualHost<?> vhost = mock(VirtualHost.class);
+ Map<String, Object> messageStoreSettings = new HashMap<String, Object>();
+ messageStoreSettings.put(MessageStore.STORE_PATH, storeLocation);
+ messageStoreSettings.put(MessageStore.OVERFULL_SIZE, OVERFULL_SIZE);
+ messageStoreSettings.put(MessageStore.UNDERFULL_SIZE, UNDERFULL_SIZE);
Map<String,String> envMap = Collections.singletonMap("je.log.fileMax", MAX_BDB_LOG_SIZE);
- when(virtualHost.getAttribute(eq("bdbEnvironmentConfig"))).thenReturn(envMap);
- when(virtualHost.getAttribute(eq(MessageStoreConstants.OVERFULL_SIZE_ATTRIBUTE))).thenReturn(OVERFULL_SIZE);
- when(virtualHost.getAttribute(eq(MessageStoreConstants.UNDERFULL_SIZE_ATTRIBUTE))).thenReturn(UNDERFULL_SIZE);
-
+ messageStoreSettings.put(EnvironmentFacadeFactory.ENVIRONMENT_CONFIGURATION, envMap);
+ when(vhost.getMessageStoreSettings()).thenReturn(messageStoreSettings);
+ when(vhost.getName()).thenReturn("test");
+ return vhost;
}
@Override
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/VirtualHostTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/VirtualHostTest.java
index da34e191f7..2caf85966c 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/VirtualHostTest.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/VirtualHostTest.java
@@ -38,7 +38,9 @@ import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.stats.StatisticsGatherer;
+import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade;
+import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacadeFactory;
import org.apache.qpid.server.util.BrokerTestHelper;
import org.apache.qpid.test.utils.QpidTestCase;
import org.apache.qpid.util.FileUtils;
@@ -101,33 +103,31 @@ public class VirtualHostTest extends QpidTestCase
String nodeHostPort = "localhost:" + findFreePort();
String helperHostPort = nodeHostPort;
String durability = "NO_SYNC,SYNC,NONE";
- String hostName = getName();
+ String virtualHostName = getName();
- Map<String, Object> virtualHostAttributes = new HashMap<String, Object>();
- virtualHostAttributes.put("haNodeName", nodeName);
- virtualHostAttributes.put("haGroupName", groupName);
- virtualHostAttributes.put("haNodeAddress", nodeHostPort);
- virtualHostAttributes.put("haHelperAddress", helperHostPort);
- virtualHostAttributes.put("haDurability", durability);
- virtualHostAttributes.put(VirtualHost.STORE_PATH, _bdbStorePath.getAbsolutePath());
- virtualHostAttributes.put("haReplicationConfig",
+ Map<String, Object> messageStoreSettings = new HashMap<String, Object>();
+ messageStoreSettings.put(ReplicatedEnvironmentFacadeFactory.NODE_NAME, nodeName);
+ messageStoreSettings.put(ReplicatedEnvironmentFacadeFactory.GROUP_NAME, groupName);
+ messageStoreSettings.put(ReplicatedEnvironmentFacadeFactory.NODE_ADDRESS, nodeHostPort);
+ messageStoreSettings.put(ReplicatedEnvironmentFacadeFactory.HELPER_ADDRESS, helperHostPort);
+ messageStoreSettings.put(ReplicatedEnvironmentFacadeFactory.DURABILITY, durability);
+
+ messageStoreSettings.put(MessageStore.STORE_PATH, _bdbStorePath.getAbsolutePath());
+ messageStoreSettings.put(ReplicatedEnvironmentFacadeFactory.REPLICATION_CONFIG,
Collections.singletonMap(ReplicationConfig.REP_STREAM_TIMEOUT, repStreamTimeout));
- virtualHostAttributes.put(VirtualHost.NAME, hostName);
+
+ Map<String, Object> virtualHostAttributes = new HashMap<String, Object>();
+ virtualHostAttributes.put(VirtualHost.NAME, virtualHostName);
virtualHostAttributes.put(VirtualHost.TYPE, BDBHAVirtualHostFactory.TYPE);
+ virtualHostAttributes.put(VirtualHost.MESSAGE_STORE_SETTINGS, messageStoreSettings);
_host = createHost(virtualHostAttributes);
_host.setDesiredState(State.INITIALISING, State.ACTIVE);
- assertEquals("Unexpected host name", hostName, _host.getName());
+ assertEquals("Unexpected virtual host name", virtualHostName, _host.getName());
assertEquals("Unexpected host type", BDBHAVirtualHostFactory.TYPE, _host.getType());
- assertEquals("Unexpected store type", ReplicatedEnvironmentFacade.TYPE, _host.getAttribute(VirtualHost.STORE_TYPE));
-
- assertEquals(nodeName, _host.getAttribute("haNodeName"));
- assertEquals(groupName, _host.getAttribute("haGroupName"));
- assertEquals(nodeHostPort, _host.getAttribute("haNodeAddress"));
- assertEquals(helperHostPort, _host.getAttribute("haHelperAddress"));
- assertEquals(durability, _host.getAttribute("haDurability"));
- assertEquals("Unexpected store path", _bdbStorePath.getAbsolutePath(), _host.getAttribute(VirtualHost.STORE_PATH));
+
+ assertEquals(messageStoreSettings, _host.getMessageStoreSettings());
BDBMessageStore messageStore = (BDBMessageStore) _host.getMessageStore();
ReplicatedEnvironment environment = (ReplicatedEnvironment) messageStore.getEnvironmentFacade().getEnvironment();
diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBBackupTest.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBBackupTest.java
index b6a178ac8a..67c89718f6 100644
--- a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBBackupTest.java
+++ b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBBackupTest.java
@@ -32,6 +32,7 @@ import javax.jms.Session;
import org.apache.log4j.Logger;
import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.test.utils.Piper;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
import org.apache.qpid.util.FileUtils;
@@ -61,7 +62,8 @@ public class BDBBackupTest extends QpidBrokerTestCase
_backupToDir = new File(SYSTEM_TMP_DIR + File.separator + getTestName());
_backupToDir.mkdirs();
Map<String, Object> virtualHostAttributes = getBrokerConfiguration().getObjectAttributes(TEST_VHOST);
- _backupFromDir = new File((String)virtualHostAttributes.get(VirtualHost.STORE_PATH));
+ Map<String, Object> messageStoreSettings = (Map<String, Object>) virtualHostAttributes.get(VirtualHost.MESSAGE_STORE_SETTINGS);
+ _backupFromDir = new File((String)messageStoreSettings.get(MessageStore.STORE_PATH));
boolean fromDirExistsAndIsDir = _backupFromDir.isDirectory();
assertTrue("backupFromDir " + _backupFromDir + " should already exist", fromDirExistsAndIsDir);
}
diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java
index 3d6a2bac67..cb56a60119 100644
--- a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java
+++ b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java
@@ -45,6 +45,7 @@ import javax.management.openmbean.TabularDataSupport;
import org.apache.qpid.management.common.mbeans.ManagedExchange;
import org.apache.qpid.management.common.mbeans.ManagedQueue;
import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.test.utils.JMXTestUtils;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
import org.apache.qpid.test.utils.TestBrokerConfiguration;
@@ -75,7 +76,7 @@ public class BDBUpgradeTest extends QpidBrokerTestCase
private static final String QUEUE_NAME="myUpgradeQueue";
private static final String NON_DURABLE_QUEUE_NAME="queue-non-durable";
private static final String PRIORITY_QUEUE_NAME="myPriorityQueue";
- private static final String QUEUE_WITH_DLQ_NAME="myQueueWithDLQ";
+ private static final String QUEUE_WITH_DLQ_NAME="myQueueWithDLQ";
private String _storeLocation;
@@ -84,7 +85,9 @@ public class BDBUpgradeTest extends QpidBrokerTestCase
{
assertNotNull("QPID_WORK must be set", QPID_WORK_ORIG);
Map<String, Object> virtualHostAttributes = getBrokerConfiguration().getObjectAttributes(TestBrokerConfiguration.ENTRY_NAME_VIRTUAL_HOST);
- _storeLocation = (String)virtualHostAttributes.get(VirtualHost.STORE_PATH);
+ @SuppressWarnings("unchecked")
+ Map<String, Object> messageStoreSettings = (Map<String, Object>) virtualHostAttributes.get(VirtualHost.MESSAGE_STORE_SETTINGS);
+ _storeLocation = (String)messageStoreSettings.get(MessageStore.STORE_PATH);
//Clear the two target directories if they exist.
File directory = new File(_storeLocation);
@@ -102,11 +105,6 @@ public class BDBUpgradeTest extends QpidBrokerTestCase
super.setUp();
}
- private String getWorkDirBaseDir()
- {
- return QPID_WORK_ORIG + (isInternalBroker() ? "" : "/" + getPort());
- }
-
/**
* Test that the selector applied to the DurableSubscription was successfully
* transfered to the new store, and functions as expected with continued use
@@ -505,7 +503,7 @@ public class BDBUpgradeTest extends QpidBrokerTestCase
return send;
}
-
+
/**
* Generates a string of a given length consisting of the sequence 0,1,2,..,9,0,1,2.
*
diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java
index bef35e163f..e8d18971ad 100644
--- a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java
+++ b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java
@@ -184,7 +184,7 @@ public class HAClusterManagementTest extends QpidBrokerTestCase
final int oldBdbPort = _clusterCreator.getBdbPortForBrokerPort(brokerPortNumberToBeMoved);
final int newBdbPort = getNextAvailable(oldBdbPort + 1);
- storeBean.updateAddress(_clusterCreator.getNodeNameForNodeAt(oldBdbPort), "localhost", newBdbPort);
+ storeBean.updateAddress(_clusterCreator.getNodeNameForNodeAt(oldBdbPort), _clusterCreator.getIpAddressOfBrokerHost(), newBdbPort);
_clusterCreator.modifyClusterNodeBdbAddress(brokerPortNumberToBeMoved, newBdbPort);
diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java
index 1a65b095b4..4efe1967ce 100644
--- a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java
+++ b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java
@@ -43,8 +43,10 @@ import org.apache.log4j.Logger;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQConnectionURL;
import org.apache.qpid.server.model.VirtualHost;
-import org.apache.qpid.test.utils.TestBrokerConfiguration;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacadeFactory;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.apache.qpid.test.utils.TestBrokerConfiguration;
import org.apache.qpid.url.URLSyntaxException;
import com.sleepycat.je.rep.ReplicationConfig;
@@ -101,19 +103,22 @@ public class HATestClusterCreator
_bdbHelperPort = bdbPort;
}
- TestBrokerConfiguration brokerConfiguration = _testcase.getBrokerConfiguration(brokerPort);
- brokerConfiguration.addJmxManagementConfiguration();
String nodeName = getNodeNameForNodeAt(bdbPort);
- brokerConfiguration.setObjectAttribute(_virtualHostName, VirtualHost.TYPE, BDBHAVirtualHostFactory.TYPE);
- brokerConfiguration.setObjectAttribute(_virtualHostName, VirtualHost.STORE_PATH, System.getProperty("QPID_WORK") + File.separator + brokerPort);
- brokerConfiguration.setObjectAttribute(_virtualHostName, "haGroupName", _groupName);
- brokerConfiguration.setObjectAttribute(_virtualHostName, "haNodeName", nodeName);
- brokerConfiguration.setObjectAttribute(_virtualHostName, "haNodeAddress", getNodeHostPortForNodeAt(bdbPort));
- brokerConfiguration.setObjectAttribute(_virtualHostName, "haHelperAddress", getHelperHostPort());
+ Map<String, Object> messageStoreSettings = new HashMap<String, Object>();
+ messageStoreSettings.put(MessageStore.STORE_PATH, System.getProperty("QPID_WORK") + File.separator + brokerPort);
+ messageStoreSettings.put(ReplicatedEnvironmentFacadeFactory.GROUP_NAME, _groupName);
+ messageStoreSettings.put(ReplicatedEnvironmentFacadeFactory.NODE_NAME, nodeName);
+ messageStoreSettings.put(ReplicatedEnvironmentFacadeFactory.NODE_ADDRESS, getNodeHostPortForNodeAt(bdbPort));
+ messageStoreSettings.put(ReplicatedEnvironmentFacadeFactory.HELPER_ADDRESS, getHelperHostPort());
Map<String, String> repSettings = new HashMap<String, String>();
repSettings.put(ReplicationConfig.INSUFFICIENT_REPLICAS_TIMEOUT, "2 s");
repSettings.put(ReplicationConfig.ELECTIONS_PRIMARY_RETRIES, "0");
- brokerConfiguration.setObjectAttribute(_virtualHostName, "haReplicationConfig", repSettings );
+ messageStoreSettings.put(ReplicatedEnvironmentFacadeFactory.REPLICATION_CONFIG, repSettings );
+
+ TestBrokerConfiguration brokerConfiguration = _testcase.getBrokerConfiguration(brokerPort);
+ brokerConfiguration.addJmxManagementConfiguration();
+ brokerConfiguration.setObjectAttribute(_virtualHostName, VirtualHost.TYPE, BDBHAVirtualHostFactory.TYPE);
+ brokerConfiguration.setObjectAttribute(_virtualHostName, VirtualHost.MESSAGE_STORE_SETTINGS, messageStoreSettings);
brokerPort = _testcase.getNextAvailable(bdbPort + 1);
}
@@ -127,7 +132,10 @@ public class HATestClusterCreator
throw new IllegalArgumentException("Only two nodes groups have the concept of primary");
}
TestBrokerConfiguration config = _testcase.getBrokerConfiguration(_primaryBrokerPort);
- config.setObjectAttribute("test", "haDesignatedPrimary", designatedPrimary);
+ @SuppressWarnings("unchecked")
+ Map<String, Object> storeSetting = (Map<String, Object>) config.getObjectAttributes(_virtualHostName).get(VirtualHost.MESSAGE_STORE_SETTINGS);
+ storeSetting.put(ReplicatedEnvironmentFacadeFactory.DESIGNATED_PRIMARY, designatedPrimary);
+ config.setObjectAttribute(_virtualHostName, VirtualHost.MESSAGE_STORE_SETTINGS, storeSetting);
config.setSaved(false);
}
@@ -360,12 +368,15 @@ public class HATestClusterCreator
public void modifyClusterNodeBdbAddress(int brokerPortNumberToBeMoved, int newBdbPort)
{
TestBrokerConfiguration config = _testcase.getBrokerConfiguration(brokerPortNumberToBeMoved);
- config.setObjectAttribute(_virtualHostName, "haNodeAddress", "localhost:" + newBdbPort);
- String oldBdbHostPort = (String)config.getObjectAttributes(_virtualHostName).get("haNodeAddress");
+
+ @SuppressWarnings("unchecked")
+ Map<String, Object> storeSetting = (Map<String, Object>) config.getObjectAttributes(_virtualHostName).get(VirtualHost.MESSAGE_STORE_SETTINGS);
+ String oldBdbHostPort = (String) storeSetting.get(ReplicatedEnvironmentFacadeFactory.NODE_ADDRESS);
String[] oldHostAndPort = StringUtils.split(oldBdbHostPort, ":");
String oldHost = oldHostAndPort[0];
String newBdbHostPort = oldHost + ":" + newBdbPort;
- config.setObjectAttribute(_virtualHostName, "haNodeAddress", newBdbHostPort);
+ storeSetting.put(ReplicatedEnvironmentFacadeFactory.NODE_ADDRESS, newBdbHostPort);
+ config.setObjectAttribute(_virtualHostName, VirtualHost.MESSAGE_STORE_SETTINGS, storeSetting);
config.setSaved(false);
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/startup/StoreUpgrader.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/startup/StoreUpgrader.java
index 124584e99c..16da78c988 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/startup/StoreUpgrader.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/startup/StoreUpgrader.java
@@ -19,10 +19,12 @@ package org.apache.qpid.server.configuration.startup;/*
*
*/
+import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
+
import org.apache.qpid.server.configuration.ConfigurationEntry;
import org.apache.qpid.server.configuration.ConfigurationEntryStore;
import org.apache.qpid.server.model.Broker;
@@ -134,6 +136,110 @@ public abstract class StoreUpgrader
}
};
+ final static StoreUpgrader UPGRADE_1_3 = new StoreUpgrader("1.3")
+ {
+ private final String[] HA_ATTRIBUTES = {"haNodeName", "haGroupName", "haHelperAddress", "haCoalescingSync", "haNodeAddress","haDurability","haDesignatedPrimary","haReplicationConfig","bdbEnvironmentConfig"};
+ private final String[] JDBC_ATTRIBUTES = {"connectionURL", "connectionPool", "jdbcBigIntType", "jdbcBytesForBlob", "jdbcVarbinaryType", "jdbcBlobType", "partitionCount", "maxConnectionsPerPartition", "minConnectionsPerPartition"};
+ private final String[] STORE_TYPES = {"BDB", "BDB-HA", "JDBC", "Memory", "DERBY"};
+
+ @Override
+ protected void doUpgrade(ConfigurationEntryStore store)
+ {
+ ConfigurationEntry root = store.getRootEntry();
+ Map<String, Collection<ConfigurationEntry>> children = root.getChildren();
+ Collection<ConfigurationEntry> vhosts = children.get("VirtualHost");
+ Collection<ConfigurationEntry> changed = new ArrayList<ConfigurationEntry>();
+ for(ConfigurationEntry vhost : vhosts)
+ {
+ Map<String, Object> attributes = vhost.getAttributes();
+ Map<String, Object> newAttributes = new HashMap<String, Object>(attributes);
+ Map<String, Object> messageStoreSettings = new HashMap<String, Object>();
+ String storeType = (String) attributes.get("storeType");
+ String realStoreType = storeType;
+ for (String type : STORE_TYPES)
+ {
+ if (type.equalsIgnoreCase(storeType))
+ {
+ realStoreType = type;
+ break;
+ }
+ }
+ if(attributes.containsKey("storeType"))
+ {
+ newAttributes.remove("storeType");
+ messageStoreSettings.put("storeType", realStoreType);
+ }
+ if (attributes.containsKey("storePath"))
+ {
+ messageStoreSettings.put("storePath", newAttributes.remove("storePath"));
+ }
+ if (attributes.containsKey("storeUnderfullSize"))
+ {
+ messageStoreSettings.put("storeUnderfullSize", newAttributes.remove("storeUnderfullSize"));
+ }
+ if (attributes.containsKey("storeOverfullSize"))
+ {
+ messageStoreSettings.put("storeOverfullSize", newAttributes.remove("storeOverfullSize"));
+ }
+
+ if ("BDB_HA".equals(attributes.get("type")))
+ {
+ for (String haAttribute : HA_ATTRIBUTES)
+ {
+ if(attributes.containsKey(haAttribute))
+ {
+ messageStoreSettings.put(haAttribute, newAttributes.remove(haAttribute));
+ }
+ }
+ messageStoreSettings.remove("storeType");
+ }
+ else
+ {
+
+ if ("JDBC".equalsIgnoreCase(realStoreType))
+ {
+ boolean removeAttribute = !"JDBC".equals(attributes.get("configStoreType"));
+ for (String jdbcAttribute : JDBC_ATTRIBUTES)
+ {
+ if(attributes.containsKey(jdbcAttribute))
+ {
+ Object value = null;
+ if (removeAttribute)
+ {
+ value = newAttributes.remove(jdbcAttribute);
+ }
+ else
+ {
+ value = newAttributes.get(jdbcAttribute);
+ }
+ messageStoreSettings.put(jdbcAttribute, value);
+ }
+ }
+ }
+ else if ("BDB".equals(realStoreType))
+ {
+ if(attributes.containsKey("bdbEnvironmentConfig"))
+ {
+ messageStoreSettings.put("bdbEnvironmentConfig", newAttributes.remove("bdbEnvironmentConfig"));
+ }
+ }
+ }
+
+ if (!messageStoreSettings.isEmpty())
+ {
+ newAttributes.put("messageStoreSettings", messageStoreSettings);
+ changed.add(new ConfigurationEntry(vhost.getId(),vhost.getType(), newAttributes, vhost.getChildrenIds(), store));
+ }
+ }
+ Map<String, Object> attributes = new HashMap<String, Object>(root.getAttributes());
+ attributes.put(Broker.MODEL_VERSION, "1.4");
+ changed.add(new ConfigurationEntry(root.getId(), root.getType(), attributes, root.getChildrenIds(),store));
+
+ store.save(changed.toArray(new ConfigurationEntry[changed.size()]));
+
+ }
+ };
+
private StoreUpgrader(String version)
{
_upgraders.put(version, this);
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/MemoryConfigurationEntryStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/MemoryConfigurationEntryStore.java
index 010d74eb7f..7b03946680 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/MemoryConfigurationEntryStore.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/MemoryConfigurationEntryStore.java
@@ -60,6 +60,7 @@ import org.codehaus.jackson.node.ArrayNode;
public class MemoryConfigurationEntryStore implements ConfigurationEntryStore
{
+
public static final String STORE_TYPE = "memory";
private static final String DEFAULT_BROKER_NAME = "Broker";
@@ -545,6 +546,10 @@ public class MemoryConfigurationEntryStore implements ConfigurationEntryStore
}
else if (fieldNode.isObject())
{
+ if (attributes == null)
+ {
+ attributes = new HashMap<String, Object>();
+ }
attributes.put(fieldName, toObject(fieldNode) );
}
else
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Model.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Model.java
index f940b323be..c48c7bb7f6 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Model.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Model.java
@@ -34,10 +34,11 @@ public class Model
*
* 1.0 Initial version
* 1.1 Addition of mandatory virtual host type / different types of virtual host
- *
+ * 1.3 Truststore/Keystore type => trustStoreType / type => keyStoreType
+ * 1.4 Separate messageStoreSettings from virtualhost
*/
public static final int MODEL_MAJOR_VERSION = 1;
- public static final int MODEL_MINOR_VERSION = 3;
+ public static final int MODEL_MINOR_VERSION = 4;
public static final String MODEL_VERSION = MODEL_MAJOR_VERSION + "." + MODEL_MINOR_VERSION;
private static final Model MODEL_INSTANCE = new Model();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
index 827c01f70f..46aa8dcc8e 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
@@ -20,15 +20,15 @@
*/
package org.apache.qpid.server.model;
+import java.security.AccessControlException;
+import java.util.Collection;
+import java.util.Map;
+
import org.apache.qpid.server.configuration.updater.TaskExecutor;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.store.MessageStore;
-import java.security.AccessControlException;
-import java.util.Collection;
-import java.util.Map;
-
@ManagedObject( managesChildren = true )
public interface VirtualHost<X extends VirtualHost<X>> extends ConfiguredObject<X>
{
@@ -48,17 +48,19 @@ public interface VirtualHost<X extends VirtualHost<X>> extends ConfiguredObject<
String STORE_TRANSACTION_IDLE_TIMEOUT_WARN = "storeTransactionIdleTimeoutWarn";
String STORE_TRANSACTION_OPEN_TIMEOUT_CLOSE = "storeTransactionOpenTimeoutClose";
String STORE_TRANSACTION_OPEN_TIMEOUT_WARN = "storeTransactionOpenTimeoutWarn";
- String STORE_TYPE = "storeType";
- String STORE_PATH = "storePath";
- String CONFIG_STORE_TYPE = "configStoreType";
- String CONFIG_STORE_PATH = "configStorePath";
String SUPPORTED_EXCHANGE_TYPES = "supportedExchangeTypes";
String SUPPORTED_QUEUE_TYPES = "supportedQueueTypes";
String DURABLE = "durable";
String LIFETIME_POLICY = "lifetimePolicy";
String SECURITY_ACL = "securityAcl";
String HOUSE_KEEPING_THREAD_COUNT = "houseKeepingThreadCount";
+ String CONFIGURATION_STORE_SETTINGS = "configurationStoreSettings";
+ String MESSAGE_STORE_SETTINGS = "messageStoreSettings";
+ @Deprecated
+ String CONFIG_STORE_TYPE = "configStoreType";
+ @Deprecated
+ String CONFIG_STORE_PATH = "configStorePath";
// Attributes
int CURRENT_CONFIG_VERSION = 4;
@@ -85,18 +87,14 @@ public interface VirtualHost<X extends VirtualHost<X>> extends ConfiguredObject<
long getQueue_flowResumeSizeBytes();
@ManagedAttribute
+ @Deprecated
String getConfigStoreType();
@ManagedAttribute
+ @Deprecated
String getConfigStorePath();
@ManagedAttribute
- String getStoreType();
-
- @ManagedAttribute
- String getStorePath();
-
- @ManagedAttribute
long getStoreTransactionIdleTimeoutClose();
@ManagedAttribute
@@ -129,6 +127,12 @@ public interface VirtualHost<X extends VirtualHost<X>> extends ConfiguredObject<
@ManagedAttribute
int getHouseKeepingThreadCount();
+ @ManagedAttribute
+ Map<String, Object> getMessageStoreSettings();
+
+ @ManagedAttribute
+ Map<String, Object> getConfigurationStoreSettings();
+
@ManagedStatistic
long getQueueCount();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java
index 82051c3a41..0060703792 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java
@@ -49,6 +49,7 @@ import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.MapValueConverter;
+import org.apache.qpid.server.util.ParameterizedTypeImpl;
import org.apache.qpid.server.plugin.VirtualHostFactory;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
import org.apache.qpid.server.virtualhost.ExchangeExistsException;
@@ -66,8 +67,6 @@ public final class VirtualHostAdapter extends AbstractConfiguredObject<VirtualHo
public static final Map<String, Type> ATTRIBUTE_TYPES = Collections.unmodifiableMap(new HashMap<String, Type>(){{
put(NAME, String.class);
put(TYPE, String.class);
- put(STORE_PATH, String.class);
- put(STORE_TYPE, String.class);
put(STATE, State.class);
put(QUEUE_ALERT_REPEAT_GAP, Long.class);
@@ -86,6 +85,9 @@ public final class VirtualHostAdapter extends AbstractConfiguredObject<VirtualHo
put(STORE_TRANSACTION_OPEN_TIMEOUT_CLOSE, Long.class);
put(STORE_TRANSACTION_OPEN_TIMEOUT_WARN, Long.class);
+ put(MESSAGE_STORE_SETTINGS, new ParameterizedTypeImpl(Map.class, String.class, Object.class));
+ put(CONFIGURATION_STORE_SETTINGS, new ParameterizedTypeImpl(Map.class, String.class, Object.class));
+
put(CONFIG_STORE_TYPE, String.class);
put(CONFIG_STORE_PATH, String.class);
@@ -126,20 +128,14 @@ public final class VirtualHostAdapter extends AbstractConfiguredObject<VirtualHo
{
throw new IllegalConfigurationException("Virtual host type must be specified");
}
- }
-
- private void validateAttributes(String type)
- {
final VirtualHostFactory factory = VirtualHostFactory.FACTORIES.get(type);
if(factory == null)
{
throw new IllegalArgumentException("Unknown virtual host type '"+ type +"'. Valid types are: " + VirtualHostFactory.TYPES.get());
}
factory.validateAttributes(getActualAttributes());
-
}
-
public Collection<VirtualHostAlias> getAliases()
{
return Collections.unmodifiableCollection(_aliases);
@@ -737,14 +733,6 @@ public final class VirtualHostAdapter extends AbstractConfiguredObject<VirtualHo
{
// TODO
}
- else if(STORE_TYPE.equals(name))
- {
- return _virtualHost.getMessageStore().getStoreType();
- }
- else if(STORE_PATH.equals(name))
- {
- return _virtualHost.getMessageStore().getStoreLocation();
- }
}
return super.getAttribute(name);
}
@@ -832,18 +820,6 @@ public final class VirtualHostAdapter extends AbstractConfiguredObject<VirtualHo
}
@Override
- public String getStoreType()
- {
- return _virtualHost.getMessageStore().getStoreType();
- }
-
- @Override
- public String getStorePath()
- {
- return _virtualHost.getMessageStore().getStoreLocation();
- }
-
- @Override
public long getStoreTransactionIdleTimeoutClose()
{
return (Long)getAttribute(VirtualHost.STORE_TRANSACTION_IDLE_TIMEOUT_CLOSE);
@@ -897,6 +873,20 @@ public final class VirtualHostAdapter extends AbstractConfiguredObject<VirtualHo
return (Long)getAttribute(VirtualHost.QUEUE_ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES);
}
+ @SuppressWarnings("unchecked")
+ @Override
+ public Map<String, Object> getMessageStoreSettings()
+ {
+ return (Map<String, Object>)getAttribute(VirtualHost.MESSAGE_STORE_SETTINGS);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public Map<String, Object> getConfigurationStoreSettings()
+ {
+ return (Map<String, Object>)getAttribute(VirtualHost.CONFIGURATION_STORE_SETTINGS);
+ }
+
@Override
public long getQueueCount()
{
@@ -1118,4 +1108,5 @@ public final class VirtualHostAdapter extends AbstractConfiguredObject<VirtualHo
{
return super.getTaskExecutor();
}
+
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/JDBCConnectionProviderFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/JDBCConnectionProviderFactory.java
index 546f2fa05d..eb981bdd02 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/JDBCConnectionProviderFactory.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/JDBCConnectionProviderFactory.java
@@ -25,6 +25,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
+
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.store.jdbc.ConnectionProvider;
@@ -32,7 +33,7 @@ public interface JDBCConnectionProviderFactory extends Pluggable
{
String getType();
- ConnectionProvider getConnectionProvider(String connectionUrl, VirtualHost virtualHost)
+ ConnectionProvider getConnectionProvider(String connectionUrl, VirtualHost virtualHost, boolean configStoreOnly)
throws SQLException;
static final class TYPES
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java
index 626b999d72..b3a6216c84 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java
@@ -28,6 +28,11 @@ import org.apache.qpid.server.model.VirtualHost;
*/
public interface MessageStore
{
+ String STORE_TYPE = "storeType";
+ String STORE_PATH = "storePath";
+ String UNDERFULL_SIZE = "storeUnderfullSize";
+ String OVERFULL_SIZE = "storeOverfullSize";
+
/**
* Called after instantiation in order to configure the message store. A particular implementation can define
* whatever parameters it wants.
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStoreConstants.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStoreConstants.java
deleted file mode 100644
index 93b669e6e4..0000000000
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStoreConstants.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store;
-
-public class MessageStoreConstants
-{
-
- public static final String ENVIRONMENT_PATH_PROPERTY = "environment-path";
- public static final String OVERFULL_SIZE_PROPERTY = "overfull-size";
- public static final String UNDERFULL_SIZE_PROPERTY = "underfull-size";
- public static final String OVERFULL_SIZE_ATTRIBUTE = "storeOverfullSize";
- public static final String UNDERFULL_SIZE_ATTRIBUTE = "storeUnderfullSize";
-
-}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/util/MapValueConverter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/util/MapValueConverter.java
index 3543ce3bcf..b759bd5dc4 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/util/MapValueConverter.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/util/MapValueConverter.java
@@ -321,21 +321,7 @@ public class MapValueConverter
else if (typeObject instanceof ParameterizedType)
{
ParameterizedType parameterizedType= (ParameterizedType)typeObject;
- Type type = parameterizedType.getRawType();
- if (type == Set.class)
- {
- Type[] actualTypeArguments = parameterizedType.getActualTypeArguments();
- if (actualTypeArguments.length != 1)
- {
- throw new IllegalArgumentException("Set type argument is not specified");
- }
- Class<?> classObject = (Class<?>)actualTypeArguments[0];
- value = toSet(rawValue, classObject, attributeName);
- }
- else
- {
- throw new IllegalArgumentException("Conversion into " + parameterizedType + " is not yet supported");
- }
+ value = convertParameterizedType(rawValue, parameterizedType, attributeName);
}
else
{
@@ -352,6 +338,62 @@ public class MapValueConverter
return attributes;
}
+ private static Object convertParameterizedType(Object rawValue, ParameterizedType parameterizedType, String attributeName)
+ {
+ Type type = parameterizedType.getRawType();
+ Type[] actualTypeArguments = parameterizedType.getActualTypeArguments();
+ Object convertedValue;
+ if (type == Set.class)
+ {
+ if (actualTypeArguments.length != 1)
+ {
+ throw new IllegalArgumentException("Unexpected number of Set type arguments " + actualTypeArguments.length);
+ }
+ Class<?> classObject = (Class<?>)actualTypeArguments[0];
+ convertedValue = toSet(rawValue, classObject, attributeName);
+ }
+ else if (type == Map.class)
+ {
+ if (actualTypeArguments.length != 2)
+ {
+ throw new IllegalArgumentException("Unexpected number of Map type arguments " + actualTypeArguments.length);
+ }
+ Class<?> keyClassObject = (Class<?>)actualTypeArguments[0];
+ Class<?> valueClassObject = (Class<?>)actualTypeArguments[1];
+ convertedValue = toMap(rawValue, keyClassObject, valueClassObject, attributeName);
+ }
+ else
+ {
+ throw new IllegalArgumentException("Conversion into " + parameterizedType + " is not yet supported");
+ }
+ return convertedValue;
+ }
+
+ private static <K,V> Map<K, V> toMap(Object rawValue, Class<K> keyClassObject, Class<V> valueClassObject, String attributeName)
+ {
+ if (rawValue == null)
+ {
+ return null;
+ }
+ if (rawValue instanceof Map)
+ {
+ Map<K, V> convertedMap = new HashMap<K, V>();
+ Map<?, ?> rawMap = (Map<?,?>)rawValue;
+
+ for (Map.Entry<?, ?> entry : rawMap.entrySet())
+ {
+ K convertedKey = convert(entry.getKey(), keyClassObject, attributeName + " (map key)");
+ V convertedValue = convert(entry.getValue(), valueClassObject, attributeName + " (map value)");
+ convertedMap.put(convertedKey, convertedValue);
+ }
+ return convertedMap;
+ }
+ else
+ {
+ throw new IllegalArgumentException("rawValue is not of unexpected type Map, was : " + rawValue.getClass());
+ }
+ }
+
public static <T> Set<T> toSet(Object rawValue, Class<T> setItemClass, String attributeName)
{
if (rawValue == null)
@@ -361,7 +403,7 @@ public class MapValueConverter
HashSet<T> set = new HashSet<T>();
if (rawValue instanceof Iterable)
{
- Iterable<?> iterable = (Iterable<?>)rawValue;
+ Iterable<?> iterable = (Iterable<?>)rawValue;
for (Object object : iterable)
{
T converted = convert(object, setItemClass, attributeName);
@@ -409,6 +451,10 @@ public class MapValueConverter
{
value = toEnum(attributeName, rawValue, (Class<Enum>) classObject);
}
+ else if (classObject == Object.class)
+ {
+ value = rawValue;
+ }
else
{
throw new IllegalArgumentException("Cannot convert '" + rawValue + "' of type '" + rawValue.getClass()
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java
index da2ef47670..47c50115d3 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java
@@ -19,6 +19,8 @@ package org.apache.qpid.server.virtualhost;/*
*
*/
+import java.util.Map;
+
import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
import org.apache.qpid.server.model.VirtualHost;
@@ -46,10 +48,9 @@ public class StandardVirtualHost extends AbstractVirtualHost
- private MessageStore initialiseMessageStore(VirtualHost virtualHost)
+ private MessageStore initialiseMessageStore(String storeType)
{
- final String storeTypeAttr = (String) virtualHost.getAttribute(VirtualHost.STORE_TYPE);
- MessageStore messageStore = MessageStoreFactory.FACTORY_LOADER.get(storeTypeAttr).createMessageStore();
+ MessageStore messageStore = MessageStoreFactory.FACTORY_LOADER.get(storeType).createMessageStore();
MessageStoreLogSubject
storeLogSubject = new MessageStoreLogSubject(getName(), messageStore.getClass().getSimpleName());
@@ -83,7 +84,9 @@ public class StandardVirtualHost extends AbstractVirtualHost
protected void initialiseStorage(VirtualHost virtualHost)
{
- _messageStore = initialiseMessageStore(virtualHost);
+ Map<String, Object> messageStoreSettings = virtualHost.getMessageStoreSettings();
+ String storeType = (String) messageStoreSettings.get(MessageStore.STORE_TYPE);
+ _messageStore = initialiseMessageStore(storeType);
_durableConfigurationStore = initialiseConfigurationStore(virtualHost);
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHostFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHostFactory.java
index 9cab87e3b4..7cc8eaa20c 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHostFactory.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHostFactory.java
@@ -25,6 +25,7 @@ import java.util.Map;
import org.apache.qpid.server.plugin.MessageStoreFactory;
import org.apache.qpid.server.plugin.VirtualHostFactory;
import org.apache.qpid.server.stats.StatisticsGatherer;
+import org.apache.qpid.server.store.MessageStore;
public class StandardVirtualHostFactory implements VirtualHostFactory
{
@@ -50,19 +51,26 @@ public class StandardVirtualHostFactory implements VirtualHostFactory
@Override
public void validateAttributes(Map<String, Object> attributes)
{
+ @SuppressWarnings("unchecked")
+ Map<String, Object> messageStoreSettings = (Map<String, Object>)attributes.get(org.apache.qpid.server.model.VirtualHost.MESSAGE_STORE_SETTINGS);
+ if (messageStoreSettings == null)
+ {
+ throw new IllegalArgumentException("Attribute '"+ org.apache.qpid.server.model.VirtualHost.MESSAGE_STORE_SETTINGS + "' is required.");
+ }
+
+ Object storeType = messageStoreSettings.get(MessageStore.STORE_TYPE);
// need store type and path
Collection<String> knownTypes = MessageStoreFactory.FACTORY_LOADER.getSupportedTypes();
- Object storeType = attributes.get(org.apache.qpid.server.model.VirtualHost.STORE_TYPE);
if (storeType == null)
{
- throw new IllegalArgumentException("Attribute '"+ org.apache.qpid.server.model.VirtualHost.STORE_TYPE
- +"' is required. Known types are : " + knownTypes);
+ throw new IllegalArgumentException("Setting '"+ MessageStore.STORE_TYPE
+ +"' is required in attribute " + org.apache.qpid.server.model.VirtualHost.MESSAGE_STORE_SETTINGS + ". Known types are : " + knownTypes);
}
else if (!(storeType instanceof String))
{
- throw new IllegalArgumentException("Attribute '"+ org.apache.qpid.server.model.VirtualHost.STORE_TYPE
+ throw new IllegalArgumentException("Setting '"+ MessageStore.STORE_TYPE
+"' is required and must be of type String. "
+"Known types are : " + knownTypes);
}
@@ -70,7 +78,7 @@ public class StandardVirtualHostFactory implements VirtualHostFactory
MessageStoreFactory factory = MessageStoreFactory.FACTORY_LOADER.get((String)storeType);
if(factory == null)
{
- throw new IllegalArgumentException("Attribute '"+ org.apache.qpid.server.model.VirtualHost.STORE_TYPE
+ throw new IllegalArgumentException("Setting '"+ MessageStore.STORE_TYPE
+"' has value '" + storeType + "' which is not one of the valid values: "
+ "Known types are : " + knownTypes);
}
diff --git a/qpid/java/broker-core/src/main/resources/initial-config.json b/qpid/java/broker-core/src/main/resources/initial-config.json
index 7173433aa8..efc98f6ac8 100644
--- a/qpid/java/broker-core/src/main/resources/initial-config.json
+++ b/qpid/java/broker-core/src/main/resources/initial-config.json
@@ -21,7 +21,7 @@
{
"name": "Broker",
"storeVersion": 1,
- "modelVersion": "1.2",
+ "modelVersion": "1.4",
"defaultVirtualHost" : "default",
"authenticationproviders" : [ {
"name" : "passwordFile",
@@ -55,8 +55,10 @@
"virtualhosts" : [ {
"name" : "default",
"type" : "STANDARD",
- "storeType" : "DERBY",
- "storePath" : "${qpid.work_dir}/derbystore/default"
+ "messageStoreSettings" : {
+ "storeType" : "DERBY",
+ "storePath" : "${qpid.work_dir}/derbystore/default"
+ }
} ],
"plugins" : [ {
"pluginType" : "MANAGEMENT-HTTP",
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/StoreUpgraderTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/StoreUpgraderTest.java
new file mode 100644
index 0000000000..1f435b502f
--- /dev/null
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/StoreUpgraderTest.java
@@ -0,0 +1,217 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.configuration.startup;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+import junit.framework.TestCase;
+
+import org.apache.qpid.server.configuration.ConfigurationEntry;
+import org.apache.qpid.server.configuration.ConfigurationEntryStore;
+import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.VirtualHost;
+
+public class StoreUpgraderTest extends TestCase
+{
+
+ private final UUID _brokerId = UUID.randomUUID();
+ private final UUID _virtualHostId = UUID.randomUUID();
+ private ConfigurationEntryStore _store = mock(ConfigurationEntryStore.class);
+
+ public void testUpgrade13To14_Derby() throws Exception
+ {
+ HashMap<String, Object> virtualHostAttributes = new HashMap<String, Object>();
+ virtualHostAttributes.put("name", "test");
+ virtualHostAttributes.put("type", "STANDARD");
+ virtualHostAttributes.put("storeType", "DERBy");
+ virtualHostAttributes.put("storePath", "/mystorepath");
+ virtualHostAttributes.put("storeUnderfullSize", 1000);
+ virtualHostAttributes.put("storeOverfullSize", 2000);
+
+ doTest(_store, virtualHostAttributes);
+
+ ConfigurationEntry expectNewRoot = new ConfigurationEntry(_brokerId, Broker.class.getSimpleName(), Collections.<String, Object>singletonMap(Broker.MODEL_VERSION, "1.4"), Collections.singleton(_virtualHostId), _store);
+ ConfigurationEntry expectedNewVirtualHost;
+ {
+ Map<String, Object> expectedNewVirtualHostMessageSettings = new HashMap<String, Object>();
+ expectedNewVirtualHostMessageSettings.put("storeType", "DERBY");
+ expectedNewVirtualHostMessageSettings.put("storePath", "/mystorepath");
+ expectedNewVirtualHostMessageSettings.put("storeUnderfullSize", 1000);
+ expectedNewVirtualHostMessageSettings.put("storeOverfullSize", 2000);
+
+ Map<String, Object> expectedNewVirtualHostAttributes = new HashMap<String, Object>();
+ expectedNewVirtualHostAttributes.put(VirtualHost.NAME, "test");
+ expectedNewVirtualHostAttributes.put(VirtualHost.TYPE, "STANDARD");
+ expectedNewVirtualHostAttributes.put(VirtualHost.MESSAGE_STORE_SETTINGS, expectedNewVirtualHostMessageSettings);
+
+ expectedNewVirtualHost = new ConfigurationEntry(_virtualHostId, VirtualHost.class.getSimpleName(), expectedNewVirtualHostAttributes, Collections.<UUID>emptySet(), _store);
+ }
+ verify(_store).save(expectedNewVirtualHost, expectNewRoot);
+ }
+
+ public void testUpgrade13To14_BdbHa() throws Exception
+ {
+ HashMap<String, Object> virtualHostAttributes = new HashMap<String, Object>();
+ virtualHostAttributes.put("name", "test");
+ virtualHostAttributes.put("type", "BDB_HA");
+ virtualHostAttributes.put("storeType", "BdB-HA");
+ virtualHostAttributes.put("storePath", "/mystorepath");
+ virtualHostAttributes.put("storeUnderfullSize", 1000);
+ virtualHostAttributes.put("storeOverfullSize", 2000);
+ virtualHostAttributes.put("haNodeName", "node1");
+ virtualHostAttributes.put("haGroupName", "group1");
+ virtualHostAttributes.put("haHelperAddress", "helper:1000");
+ virtualHostAttributes.put("haCoalescingSync", true);
+ virtualHostAttributes.put("haNodeAddress", "nodeaddr:1000");
+ virtualHostAttributes.put("haDurability", "sync,sync,all");
+ virtualHostAttributes.put("haDesignatedPrimary", true);
+ virtualHostAttributes.put("haReplicationConfig", Collections.singletonMap("hasettings", "havalue"));
+ virtualHostAttributes.put("bdbEnvironmentConfig", Collections.singletonMap("envsettings", "envvalue"));
+
+ doTest(_store, virtualHostAttributes);
+
+ ConfigurationEntry expectNewRoot = new ConfigurationEntry(_brokerId, Broker.class.getSimpleName(), Collections.<String, Object>singletonMap(Broker.MODEL_VERSION, "1.4"), Collections.singleton(_virtualHostId), _store);
+ ConfigurationEntry expectedNewVirtualHost;
+ {
+ Map<String, Object> expectedNewVirtualHostMessageSettings = new HashMap<String, Object>();
+ expectedNewVirtualHostMessageSettings.put("storePath", "/mystorepath");
+ expectedNewVirtualHostMessageSettings.put("storeUnderfullSize", 1000);
+ expectedNewVirtualHostMessageSettings.put("storeOverfullSize", 2000);
+ expectedNewVirtualHostMessageSettings.put("haNodeName", "node1");
+ expectedNewVirtualHostMessageSettings.put("haGroupName", "group1");
+ expectedNewVirtualHostMessageSettings.put("haHelperAddress", "helper:1000");
+ expectedNewVirtualHostMessageSettings.put("haCoalescingSync", true);
+ expectedNewVirtualHostMessageSettings.put("haNodeAddress", "nodeaddr:1000");
+ expectedNewVirtualHostMessageSettings.put("haDurability", "sync,sync,all");
+ expectedNewVirtualHostMessageSettings.put("haDesignatedPrimary", true);
+ expectedNewVirtualHostMessageSettings.put("haReplicationConfig", Collections.singletonMap("hasettings", "havalue"));
+ expectedNewVirtualHostMessageSettings.put("bdbEnvironmentConfig", Collections.singletonMap("envsettings", "envvalue"));
+
+ Map<String, Object> expectedNewVirtualHostAttributes = new HashMap<String, Object>();
+ expectedNewVirtualHostAttributes.put(VirtualHost.NAME, "test");
+ expectedNewVirtualHostAttributes.put(VirtualHost.TYPE, "BDB_HA");
+ expectedNewVirtualHostAttributes.put(VirtualHost.MESSAGE_STORE_SETTINGS, expectedNewVirtualHostMessageSettings);
+
+ expectedNewVirtualHost = new ConfigurationEntry(_virtualHostId, VirtualHost.class.getSimpleName(), expectedNewVirtualHostAttributes, Collections.<UUID>emptySet(), _store);
+ }
+ verify(_store).save(expectedNewVirtualHost, expectNewRoot);
+ }
+
+ public void testUpgrade13To14_Bdb() throws Exception
+ {
+ HashMap<String, Object> virtualHostAttributes = new HashMap<String, Object>();
+ virtualHostAttributes.put("name", "test");
+ virtualHostAttributes.put("type", "STANDARD");
+ virtualHostAttributes.put("storeType", "BdB");
+ virtualHostAttributes.put("storePath", "/mystorepath");
+ virtualHostAttributes.put("storeUnderfullSize", 1000);
+ virtualHostAttributes.put("storeOverfullSize", 2000);
+ virtualHostAttributes.put("bdbEnvironmentConfig", Collections.singletonMap("envsettings", "envvalue"));
+
+ doTest(_store, virtualHostAttributes);
+
+ ConfigurationEntry expectNewRoot = new ConfigurationEntry(_brokerId, Broker.class.getSimpleName(), Collections.<String, Object>singletonMap(Broker.MODEL_VERSION, "1.4"), Collections.singleton(_virtualHostId), _store);
+ ConfigurationEntry expectedNewVirtualHost;
+ {
+ Map<String, Object> expectedNewVirtualHostMessageSettings = new HashMap<String, Object>();
+ expectedNewVirtualHostMessageSettings.put("storeType", "BDB");
+ expectedNewVirtualHostMessageSettings.put("storePath", "/mystorepath");
+ expectedNewVirtualHostMessageSettings.put("storeUnderfullSize", 1000);
+ expectedNewVirtualHostMessageSettings.put("storeOverfullSize", 2000);
+ expectedNewVirtualHostMessageSettings.put("bdbEnvironmentConfig", Collections.singletonMap("envsettings", "envvalue"));
+
+ Map<String, Object> expectedNewVirtualHostAttributes = new HashMap<String, Object>();
+ expectedNewVirtualHostAttributes.put(VirtualHost.NAME, "test");
+ expectedNewVirtualHostAttributes.put(VirtualHost.TYPE, "STANDARD");
+ expectedNewVirtualHostAttributes.put(VirtualHost.MESSAGE_STORE_SETTINGS, expectedNewVirtualHostMessageSettings);
+
+ expectedNewVirtualHost = new ConfigurationEntry(_virtualHostId, VirtualHost.class.getSimpleName(), expectedNewVirtualHostAttributes, Collections.<UUID>emptySet(), _store);
+ }
+ verify(_store).save(expectedNewVirtualHost, expectNewRoot);
+ }
+
+ public void testUpgrade13To14_JDBC() throws Exception
+ {
+ HashMap<String, Object> virtualHostAttributes = new HashMap<String, Object>();
+ virtualHostAttributes.put("name", "test");
+ virtualHostAttributes.put("type", "STANDARD");
+ virtualHostAttributes.put("storeType", "JdBC");
+ virtualHostAttributes.put("connectionURL", "jdbc:test");
+ virtualHostAttributes.put("connectionPool", "BONECP");
+ virtualHostAttributes.put("jdbcBigIntType", "NUMBER");
+ virtualHostAttributes.put("jdbcBytesForBlob", true);
+ virtualHostAttributes.put("jdbcVarbinaryType", "TEST");
+ virtualHostAttributes.put("jdbcBlobType", "BLOB");
+ virtualHostAttributes.put("partitionCount", 10);
+ virtualHostAttributes.put("maxConnectionsPerPartition", 8);
+ virtualHostAttributes.put("minConnectionsPerPartition", 2);
+
+ doTest(_store, virtualHostAttributes);
+
+ ConfigurationEntry expectNewRoot = new ConfigurationEntry(_brokerId, Broker.class.getSimpleName(), Collections.<String, Object>singletonMap(Broker.MODEL_VERSION, "1.4"), Collections.singleton(_virtualHostId), _store);
+ ConfigurationEntry expectedNewVirtualHost;
+ {
+ Map<String, Object> expectedNewVirtualHostMessageSettings = new HashMap<String, Object>();
+ expectedNewVirtualHostMessageSettings.put("storeType", "JDBC");
+ expectedNewVirtualHostMessageSettings.put("connectionURL", "jdbc:test");
+ expectedNewVirtualHostMessageSettings.put("connectionPool", "BONECP");
+ expectedNewVirtualHostMessageSettings.put("jdbcBigIntType", "NUMBER");
+ expectedNewVirtualHostMessageSettings.put("jdbcBytesForBlob", true);
+ expectedNewVirtualHostMessageSettings.put("jdbcVarbinaryType", "TEST");
+ expectedNewVirtualHostMessageSettings.put("jdbcBlobType", "BLOB");
+ expectedNewVirtualHostMessageSettings.put("partitionCount", 10);
+ expectedNewVirtualHostMessageSettings.put("maxConnectionsPerPartition", 8);
+ expectedNewVirtualHostMessageSettings.put("minConnectionsPerPartition", 2);
+
+ Map<String, Object> expectedNewVirtualHostAttributes = new HashMap<String, Object>();
+ expectedNewVirtualHostAttributes.put(VirtualHost.NAME, "test");
+ expectedNewVirtualHostAttributes.put(VirtualHost.TYPE, "STANDARD");
+ expectedNewVirtualHostAttributes.put(VirtualHost.MESSAGE_STORE_SETTINGS, expectedNewVirtualHostMessageSettings);
+
+ expectedNewVirtualHost = new ConfigurationEntry(_virtualHostId, VirtualHost.class.getSimpleName(), expectedNewVirtualHostAttributes, Collections.<UUID>emptySet(), _store);
+ }
+ verify(_store).save(expectedNewVirtualHost, expectNewRoot);
+ }
+
+ private void doTest(ConfigurationEntryStore store, Map<String,Object> virtualHostAttributes)
+ {
+ final ConfigurationEntry virtualHostEntry = new ConfigurationEntry(_virtualHostId, VirtualHost.class.getSimpleName(), virtualHostAttributes, Collections.<UUID>emptySet(), store);
+
+ final ConfigurationEntry rootEntry;
+ {
+ Map<String, Object> rootEntryAttributes = Collections.<String, Object>singletonMap(Broker.MODEL_VERSION, "1.3");
+ rootEntry = new ConfigurationEntry(_brokerId, Broker.class.getSimpleName(), rootEntryAttributes, Collections.singleton(_virtualHostId), store);
+ }
+
+ when(store.getRootEntry()).thenReturn(rootEntry);
+ when(store.getEntry(_virtualHostId)).thenReturn(virtualHostEntry);
+
+ StoreUpgrader.UPGRADE_1_3.doUpgrade(store);
+ }
+
+}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/VirtualHostRecovererTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/VirtualHostRecovererTest.java
index 80f935a55e..0eea2663fd 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/VirtualHostRecovererTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/VirtualHostRecovererTest.java
@@ -23,6 +23,7 @@ package org.apache.qpid.server.configuration.startup;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@@ -34,6 +35,7 @@ import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.stats.StatisticsGatherer;
+import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.TestMemoryMessageStore;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
import org.apache.qpid.server.virtualhost.StandardVirtualHostFactory;
@@ -54,8 +56,7 @@ public class VirtualHostRecovererTest extends TestCase
Map<String, Object> attributes = new HashMap<String, Object>();
attributes.put(VirtualHost.NAME, getName());
attributes.put(VirtualHost.TYPE, StandardVirtualHostFactory.TYPE);
-
- attributes.put(VirtualHost.STORE_TYPE, TestMemoryMessageStore.TYPE);
+ attributes.put(VirtualHost.MESSAGE_STORE_SETTINGS, Collections.singletonMap(MessageStore.STORE_TYPE, TestMemoryMessageStore.TYPE));
when(entry.getAttributes()).thenReturn(attributes);
VirtualHost host = recoverer.create(null, entry, parent);
@@ -68,15 +69,9 @@ public class VirtualHostRecovererTest extends TestCase
{
Map<String, Object> attributes = new HashMap<String, Object>();
attributes.put(VirtualHost.NAME, getName());
- attributes.put(VirtualHost.TYPE, "STANDARD");
- String[] mandatoryAttributes = {VirtualHost.NAME, VirtualHost.TYPE};
-
- checkMandatoryAttributesAreValidated(mandatoryAttributes, attributes);
-
- attributes = new HashMap<String, Object>();
- attributes.put(VirtualHost.NAME, getName());
- attributes.put(VirtualHost.STORE_TYPE, "MEMORY");
- mandatoryAttributes = new String[]{VirtualHost.NAME, VirtualHost.STORE_TYPE};
+ attributes.put(VirtualHost.TYPE, StandardVirtualHostFactory.TYPE);
+ attributes.put(VirtualHost.MESSAGE_STORE_SETTINGS, Collections.singletonMap(MessageStore.STORE_TYPE, TestMemoryMessageStore.TYPE));
+ String[] mandatoryAttributes = {VirtualHost.NAME, VirtualHost.TYPE, VirtualHost.MESSAGE_STORE_SETTINGS};
checkMandatoryAttributesAreValidated(mandatoryAttributes, attributes);
}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java
index 2410df1fe0..266049e611 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java
@@ -33,6 +33,7 @@ import org.apache.qpid.server.configuration.RecovererProvider;
import org.apache.qpid.server.configuration.startup.VirtualHostRecoverer;
import org.apache.qpid.server.configuration.updater.TaskExecutor;
import org.apache.qpid.server.stats.StatisticsGatherer;
+import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.TestMemoryMessageStore;
import org.apache.qpid.server.util.BrokerTestHelper;
import org.apache.qpid.server.virtualhost.StandardVirtualHostFactory;
@@ -88,7 +89,7 @@ public class VirtualHostTest extends QpidTestCase
Map<String, Object> attributes = new HashMap<String, Object>();
attributes.put(VirtualHost.NAME, getName());
attributes.put(VirtualHost.TYPE, StandardVirtualHostFactory.TYPE);
- attributes.put(VirtualHost.STORE_TYPE, TestMemoryMessageStore.TYPE);
+ attributes.put(VirtualHost.MESSAGE_STORE_SETTINGS, Collections.singletonMap(MessageStore.STORE_TYPE, TestMemoryMessageStore.TYPE));
attributes.put(VirtualHost.STATE, State.QUIESCED);
VirtualHost host = createHost(attributes);
@@ -149,7 +150,7 @@ public class VirtualHostTest extends QpidTestCase
Map<String, Object> attributes = new HashMap<String, Object>();
attributes.put(VirtualHost.NAME, getName());
attributes.put(VirtualHost.TYPE, StandardVirtualHostFactory.TYPE);
- attributes.put(VirtualHost.STORE_TYPE, TestMemoryMessageStore.TYPE);
+ attributes.put(VirtualHost.MESSAGE_STORE_SETTINGS, Collections.singletonMap(MessageStore.STORE_TYPE, TestMemoryMessageStore.TYPE));
VirtualHost host = createHost(attributes);
return host;
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java
index a9036a49a4..931fe36d5d 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java
@@ -84,16 +84,19 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
private UUID _queueId;
private UUID _exchangeId;
private DurableConfigurationStore _configStore;
+ protected Map<String, Object> _messageStoreSettings;
public void setUp() throws Exception
{
super.setUp();
+ _messageStoreSettings = new HashMap<String, Object>();
_queueId = UUIDGenerator.generateRandomUUID();
_exchangeId = UUIDGenerator.generateRandomUUID();
_storeName = getName();
_storePath = TMP_FOLDER + File.separator + _storeName;
+ _messageStoreSettings.put(MessageStore.STORE_PATH, _storePath);
FileUtils.delete(new File(_storePath), true);
setTestSystemProperty("QPID_WORK", TMP_FOLDER);
@@ -114,7 +117,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
when(_exchange.getId()).thenReturn(_exchangeId);
when(_exchange.getExchangeType()).thenReturn(mock(ExchangeType.class));
when(_exchange.getEventLogger()).thenReturn(new EventLogger());
- when(_virtualHost.getAttribute(eq(VirtualHost.STORE_PATH))).thenReturn(_storePath);
+ when(_virtualHost.getMessageStoreSettings()).thenReturn(_messageStoreSettings);
_bindingArgs = new HashMap<String, Object>();
String argKey = AMQPFilterTypes.JMS_SELECTOR.toString();
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java
index 7b29a48d60..908f3fe6e1 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java
@@ -20,6 +20,9 @@
*/
package org.apache.qpid.server.store;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
import java.io.File;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -33,10 +36,6 @@ import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRec
import org.apache.qpid.test.utils.QpidTestCase;
import org.apache.qpid.util.FileUtils;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
public abstract class MessageStoreQuotaEventsTestBase extends QpidTestCase implements EventListener, TransactionLogResource
{
private static final Logger _logger = Logger.getLogger(MessageStoreQuotaEventsTestBase.class);
@@ -50,9 +49,7 @@ public abstract class MessageStoreQuotaEventsTestBase extends QpidTestCase imple
private UUID _transactionResource;
protected abstract MessageStore createStore() throws Exception;
-
- protected abstract void applyStoreSpecificConfiguration(VirtualHost virtualHost);
-
+ protected abstract VirtualHost<?> createVirtualHost(String storeLocation);
protected abstract int getNumberOfMessagesToFillStore();
@Override
@@ -64,11 +61,7 @@ public abstract class MessageStoreQuotaEventsTestBase extends QpidTestCase imple
FileUtils.delete(_storeLocation, true);
- VirtualHost vhost = mock(VirtualHost.class);
- when(vhost.getAttribute(eq(VirtualHost.STORE_PATH))).thenReturn(_storeLocation.getAbsolutePath());
- when(vhost.getName()).thenReturn("test");
-
- applyStoreSpecificConfiguration(vhost);
+ VirtualHost<?> vhost = createVirtualHost(_storeLocation.getAbsolutePath());
_store = createStore();
((DurableConfigurationStore)_store).configureConfigStore(vhost, null);
@@ -82,6 +75,7 @@ public abstract class MessageStoreQuotaEventsTestBase extends QpidTestCase imple
_store.addEventListener(this, Event.PERSISTENT_MESSAGE_SIZE_OVERFULL, Event.PERSISTENT_MESSAGE_SIZE_UNDERFULL);
}
+
@Override
public void tearDown() throws Exception
{
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/StateManagerTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/StateManagerTest.java
index 3ee98f9a21..1996620950 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/StateManagerTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/StateManagerTest.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.store;
import java.util.EnumSet;
+
import junit.framework.TestCase;
public class StateManagerTest extends TestCase implements EventListener
@@ -115,7 +116,7 @@ public class StateManagerTest extends TestCase implements EventListener
performValidTransition(StateManager.INITIALISE_COMPLETE);
performValidTransition(StateManager.CLOSE_INITIALISED);
performValidTransition(StateManager.CLOSE_COMPLETE);
-
+
_manager = new StateManager(this);
performValidTransition(StateManager.INITIALISE);
performValidTransition(StateManager.INITIALISE_COMPLETE);
@@ -141,13 +142,13 @@ public class StateManagerTest extends TestCase implements EventListener
performInvalidTransitions(StateManager.INITIALISE, State.INITIALISED);
performInvalidTransitions(StateManager.INITIALISE_COMPLETE, State.ACTIVATING, State.CLOSING);
- performInvalidTransitions(StateManager.ACTIVATE, State.ACTIVE);
+ performInvalidTransitions(StateManager.ACTIVATE, State.ACTIVE, State.CLOSING);
performInvalidTransitions(StateManager.ACTIVATE_COMPLETE, State.QUIESCING, State.CLOSING, State.INITIALISED);
performInvalidTransitions(StateManager.QUIESCE, State.QUIESCED);
performInvalidTransitions(StateManager.QUIESCE_COMPLETE, State.ACTIVATING, State.CLOSING);
performInvalidTransitions(StateManager.CLOSE_QUIESCED, State.CLOSED);
performInvalidTransitions(StateManager.CLOSE_COMPLETE);
-
+
}
private void performInvalidTransitions(StateManager.Transition preTransition, State... validEndStates)
@@ -156,7 +157,7 @@ public class StateManagerTest extends TestCase implements EventListener
{
performValidTransition(preTransition);
}
-
+
EnumSet<State> endStates = EnumSet.allOf(State.class);
if(validEndStates != null)
@@ -166,13 +167,13 @@ public class StateManagerTest extends TestCase implements EventListener
endStates.remove(state);
}
}
-
+
for(State invalidEndState : endStates)
{
performInvalidStateTransition(invalidEndState);
}
-
+
}
private void performInvalidStateTransition(State invalidEndState)
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java
index 0dc25a2ad2..fd56f3fa1c 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java
@@ -42,6 +42,7 @@ import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.security.SubjectCreator;
import org.apache.qpid.server.stats.StatisticsGatherer;
+import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.TestMemoryMessageStore;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
import org.apache.qpid.server.virtualhost.StandardVirtualHostFactory;
@@ -108,8 +109,10 @@ public class BrokerTestHelper
when(virtualHost.getType()).thenReturn(StandardVirtualHostFactory.TYPE);
when(virtualHost.getAttribute(org.apache.qpid.server.model.VirtualHost.TYPE)).thenReturn(StandardVirtualHostFactory.TYPE);
- when(virtualHost.getStoreType()).thenReturn(TestableMemoryMessageStore.TYPE);
- when(virtualHost.getAttribute(org.apache.qpid.server.model.VirtualHost.STORE_TYPE)).thenReturn(TestableMemoryMessageStore.TYPE);
+ Map<String, Object> messageStoreSettings = new HashMap<String, Object>();
+ messageStoreSettings.put(MessageStore.STORE_TYPE, TestableMemoryMessageStore.TYPE);
+
+ when(virtualHost.getMessageStoreSettings()).thenReturn(messageStoreSettings);
when(virtualHost.getName()).thenReturn(name);
return createVirtualHost(virtualHostRegistry, virtualHost);
}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/StandardVirtualHostTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/StandardVirtualHostTest.java
index 570e748d7a..35b4b89bf6 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/StandardVirtualHostTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/StandardVirtualHostTest.java
@@ -30,7 +30,6 @@ import java.util.Map;
import java.util.UUID;
import org.apache.qpid.server.binding.BindingImpl;
-
import org.apache.qpid.server.exchange.AbstractExchange;
import org.apache.qpid.server.exchange.ExchangeImpl;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
@@ -42,7 +41,9 @@ import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.stats.StatisticsGatherer;
import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
import org.apache.qpid.server.store.JsonFileConfigStore;
+import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.TestMemoryMessageStore;
+import org.apache.qpid.server.store.TestableMemoryMessageStore;
import org.apache.qpid.server.util.BrokerTestHelper;
import org.apache.qpid.test.utils.QpidTestCase;
import org.apache.qpid.test.utils.TestFileUtils;
@@ -249,13 +250,18 @@ public class StandardVirtualHostTest extends QpidTestCase
private VirtualHost createVirtualHost(String virtualHostName) throws Exception
{
Broker<?> broker = BrokerTestHelper.createBrokerMock();
+
_virtualHostRegistry = broker.getVirtualHostRegistry();
org.apache.qpid.server.model.VirtualHost<?> model = mock(org.apache.qpid.server.model.VirtualHost.class);
when(model.getAttribute(org.apache.qpid.server.model.VirtualHost.CONFIG_STORE_TYPE)).thenReturn(JsonFileConfigStore.TYPE);
when(model.getAttribute(org.apache.qpid.server.model.VirtualHost.CONFIG_STORE_PATH)).thenReturn(_storeFolder.getAbsolutePath());
- when(model.getAttribute(org.apache.qpid.server.model.VirtualHost.STORE_TYPE)).thenReturn(TestMemoryMessageStore.TYPE);
+
+ Map<String, Object> messageStoreSettings = new HashMap<String, Object>();
+ messageStoreSettings.put(MessageStore.STORE_TYPE, TestableMemoryMessageStore.TYPE);
+ when(model.getMessageStoreSettings()).thenReturn(messageStoreSettings);
when(model.getName()).thenReturn(virtualHostName);
+
VirtualHost host = new StandardVirtualHostFactory().createVirtualHost(_virtualHostRegistry, mock(StatisticsGatherer.class),
new SecurityManager(broker, false), model);
_virtualHostRegistry.registerVirtualHost(host);
diff --git a/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java b/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
index 5d56329c20..25ce3b8adc 100644
--- a/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
+++ b/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
@@ -32,6 +32,8 @@ import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
+
import org.apache.log4j.Logger;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.store.AbstractJDBCMessageStore;
@@ -39,7 +41,6 @@ import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.Event;
import org.apache.qpid.server.store.EventListener;
import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.MessageStoreConstants;
import org.apache.qpid.server.store.StoreException;
import org.apache.qpid.util.FileUtils;
@@ -130,13 +131,25 @@ public class DerbyMessageStore extends AbstractJDBCMessageStore implements Messa
{
//Update to pick up QPID_WORK and use that as the default location not just derbyDB
+ Map<String, Object> messageStoreSettings = virtualHost.getMessageStoreSettings();
_driverClass = (Class<Driver>) Class.forName(SQL_DRIVER_NAME);
- String defaultPath = System.getProperty("QPID_WORK") + File.separator + "derbyDB";
- String databasePath = isConfigStoreOnly() ? (String) virtualHost.getAttribute(VirtualHost.CONFIG_STORE_PATH) : (String) virtualHost.getAttribute(VirtualHost.STORE_PATH);
+ String databasePath = null;
+ if (isConfigStoreOnly())
+ {
+ databasePath = (String) virtualHost.getAttribute(VirtualHost.CONFIG_STORE_PATH);
+ }
+ else
+ {
+ if (messageStoreSettings != null)
+ {
+ databasePath = (String) messageStoreSettings.get(MessageStore.STORE_PATH);
+ }
+ }
+
if(databasePath == null)
{
- databasePath = defaultPath;
+ databasePath = System.getProperty("QPID_WORK") + File.separator + "derbyDB";
}
if(!MEMORY_STORE_LOCATION.equals(databasePath))
@@ -154,8 +167,8 @@ public class DerbyMessageStore extends AbstractJDBCMessageStore implements Messa
_storeLocation = databasePath;
- Object overfullAttr = virtualHost.getAttribute(MessageStoreConstants.OVERFULL_SIZE_ATTRIBUTE);
- Object underfullAttr = virtualHost.getAttribute(MessageStoreConstants.UNDERFULL_SIZE_ATTRIBUTE);
+ Object overfullAttr = messageStoreSettings.get(MessageStore.OVERFULL_SIZE);
+ Object underfullAttr = messageStoreSettings.get(MessageStore.UNDERFULL_SIZE);
_persistentSizeHighThreshold = overfullAttr == null ? -1l :
overfullAttr instanceof Number ? ((Number) overfullAttr).longValue() : Long.parseLong(overfullAttr.toString());
diff --git a/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStoreFactory.java b/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStoreFactory.java
index d64420a808..4e81c4e9ba 100644
--- a/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStoreFactory.java
+++ b/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStoreFactory.java
@@ -52,12 +52,15 @@ public class DerbyMessageStoreFactory implements MessageStoreFactory, DurableCon
@Override
public void validateAttributes(Map<String, Object> attributes)
{
- if(getType().equals(attributes.get(VirtualHost.STORE_TYPE)))
+ @SuppressWarnings("unchecked")
+ Map<String, Object> messageStoreSettings = (Map<String, Object>) attributes.get(VirtualHost.MESSAGE_STORE_SETTINGS);
+
+ if(getType().equals(messageStoreSettings.get(MessageStore.STORE_TYPE)))
{
- Object storePath = attributes.get(VirtualHost.STORE_PATH);
+ Object storePath = messageStoreSettings.get(MessageStore.STORE_PATH);
if(!(storePath instanceof String))
{
- throw new IllegalArgumentException("Attribute '"+ VirtualHost.STORE_PATH
+ throw new IllegalArgumentException("Setting '"+ MessageStore.STORE_PATH
+"' is required and must be of type String.");
}
diff --git a/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreQuotaEventsTest.java b/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreQuotaEventsTest.java
index 479675dac1..f23b5a3e23 100644
--- a/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreQuotaEventsTest.java
+++ b/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreQuotaEventsTest.java
@@ -20,15 +20,17 @@
*/
package org.apache.qpid.server.store.derby;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.HashMap;
+import java.util.Map;
+
import org.apache.log4j.Logger;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.MessageStoreConstants;
import org.apache.qpid.server.store.MessageStoreQuotaEventsTestBase;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.when;
-
public class DerbyMessageStoreQuotaEventsTest extends MessageStoreQuotaEventsTestBase
{
private static final Logger _logger = Logger.getLogger(DerbyMessageStoreQuotaEventsTest.class);
@@ -50,17 +52,26 @@ public class DerbyMessageStoreQuotaEventsTest extends MessageStoreQuotaEventsTes
}
@Override
- protected void applyStoreSpecificConfiguration(VirtualHost vhost)
+ protected MessageStore createStore() throws Exception
{
- _logger.debug("Applying store specific config. overfull-sze=" + OVERFULL_SIZE + ", underfull-size=" + UNDERFULL_SIZE);
-
- when(vhost.getAttribute(eq(MessageStoreConstants.OVERFULL_SIZE_ATTRIBUTE))).thenReturn(OVERFULL_SIZE);
- when(vhost.getAttribute(eq(MessageStoreConstants.UNDERFULL_SIZE_ATTRIBUTE))).thenReturn(UNDERFULL_SIZE);
+ return new DerbyMessageStore();
}
@Override
- protected MessageStore createStore() throws Exception
+ protected VirtualHost<?> createVirtualHost(String storeLocation)
{
- return new DerbyMessageStore();
+ _logger.debug("Applying store specific config. overfull-size=" + OVERFULL_SIZE + ", underfull-size=" + UNDERFULL_SIZE);
+
+ VirtualHost<?> vhost = mock(VirtualHost.class);
+ Map<String, Object> messageStoreSettings = new HashMap<String, Object>();
+ messageStoreSettings.put(MessageStore.STORE_PATH, storeLocation);
+ messageStoreSettings.put(MessageStore.OVERFULL_SIZE, OVERFULL_SIZE);
+ messageStoreSettings.put(MessageStore.UNDERFULL_SIZE, UNDERFULL_SIZE);
+
+ when(vhost.getMessageStoreSettings()).thenReturn(messageStoreSettings );
+ when(vhost.getName()).thenReturn("test");
+
+ return vhost;
}
+
}
diff --git a/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreTest.java b/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreTest.java
index 859fad629b..e3f91cc8fb 100644
--- a/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreTest.java
+++ b/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreTest.java
@@ -20,16 +20,17 @@
*/
package org.apache.qpid.server.store.derby;
+import static org.mockito.Mockito.when;
+
import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.MessageStoreTestCase;
import org.apache.qpid.util.FileUtils;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.when;
-
public class DerbyMessageStoreTest extends MessageStoreTestCase
{
private String _storeLocation;
@@ -63,7 +64,9 @@ public class DerbyMessageStoreTest extends MessageStoreTestCase
protected void setUpStoreConfiguration(VirtualHost virtualHost) throws Exception
{
_storeLocation = TMP_FOLDER + File.separator + getTestName();
- when(virtualHost.getAttribute(eq(VirtualHost.STORE_PATH))).thenReturn(_storeLocation);
+ Map<String, Object> messageStoreSettings = new HashMap<String, Object>();
+ messageStoreSettings.put(MessageStore.STORE_PATH, _storeLocation);
+ when(virtualHost.getMessageStoreSettings()).thenReturn(messageStoreSettings);
deleteStoreIfExists();
}
diff --git a/qpid/java/broker-plugins/jdbc-provider-bone/src/main/java/org/apache/qpid/server/store/jdbc/bonecp/BoneCPConnectionProvider.java b/qpid/java/broker-plugins/jdbc-provider-bone/src/main/java/org/apache/qpid/server/store/jdbc/bonecp/BoneCPConnectionProvider.java
index 1d9ff9a8e1..370f92651c 100644
--- a/qpid/java/broker-plugins/jdbc-provider-bone/src/main/java/org/apache/qpid/server/store/jdbc/bonecp/BoneCPConnectionProvider.java
+++ b/qpid/java/broker-plugins/jdbc-provider-bone/src/main/java/org/apache/qpid/server/store/jdbc/bonecp/BoneCPConnectionProvider.java
@@ -20,29 +20,47 @@
*/
package org.apache.qpid.server.store.jdbc.bonecp;
-import com.jolbox.bonecp.BoneCP;
-import com.jolbox.bonecp.BoneCPConfig;
import java.sql.Connection;
import java.sql.SQLException;
+import java.util.Map;
+
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.store.jdbc.ConnectionProvider;
+import org.apache.qpid.server.util.MapValueConverter;
+
+import com.jolbox.bonecp.BoneCP;
+import com.jolbox.bonecp.BoneCPConfig;
public class BoneCPConnectionProvider implements ConnectionProvider
{
+ public static final String PARTITION_COUNT = "partitionCount";
+ public static final String MAX_CONNECTIONS_PER_PARTITION = "maxConnectionsPerPartition";
+ public static final String MIN_CONNECTIONS_PER_PARTITION = "minConnectionsPerPartition";
+
public static final int DEFAULT_MIN_CONNECTIONS_PER_PARTITION = 5;
public static final int DEFAULT_MAX_CONNECTIONS_PER_PARTITION = 10;
public static final int DEFAULT_PARTITION_COUNT = 4;
+
private final BoneCP _connectionPool;
- public BoneCPConnectionProvider(String connectionUrl, VirtualHost virtualHost) throws SQLException
+ public BoneCPConnectionProvider(String connectionUrl, VirtualHost virtualHost, boolean configStoreOnly) throws SQLException
{
BoneCPConfig config = new BoneCPConfig();
config.setJdbcUrl(connectionUrl);
-
- config.setMinConnectionsPerPartition(getIntegerAttribute(virtualHost, "minConnectionsPerPartition", DEFAULT_MIN_CONNECTIONS_PER_PARTITION));
- config.setMaxConnectionsPerPartition(getIntegerAttribute(virtualHost, "maxConnectionsPerPartition", DEFAULT_MAX_CONNECTIONS_PER_PARTITION));
- config.setPartitionCount(getIntegerAttribute(virtualHost, "partitionCount",DEFAULT_PARTITION_COUNT));
+ if (configStoreOnly)
+ {
+ config.setMinConnectionsPerPartition(getIntegerAttribute(virtualHost, MIN_CONNECTIONS_PER_PARTITION, DEFAULT_MIN_CONNECTIONS_PER_PARTITION));
+ config.setMaxConnectionsPerPartition(getIntegerAttribute(virtualHost, MAX_CONNECTIONS_PER_PARTITION, DEFAULT_MAX_CONNECTIONS_PER_PARTITION));
+ config.setPartitionCount(getIntegerAttribute(virtualHost, PARTITION_COUNT,DEFAULT_PARTITION_COUNT));
+ }
+ else
+ {
+ Map<String, Object> storeSettings = virtualHost.getMessageStoreSettings();
+ config.setMinConnectionsPerPartition(MapValueConverter.getIntegerAttribute(MIN_CONNECTIONS_PER_PARTITION, storeSettings, DEFAULT_MIN_CONNECTIONS_PER_PARTITION));
+ config.setMaxConnectionsPerPartition(MapValueConverter.getIntegerAttribute(MAX_CONNECTIONS_PER_PARTITION, storeSettings, DEFAULT_MAX_CONNECTIONS_PER_PARTITION));
+ config.setPartitionCount(MapValueConverter.getIntegerAttribute(PARTITION_COUNT, storeSettings,DEFAULT_PARTITION_COUNT));
+ }
_connectionPool = new BoneCP(config);
}
diff --git a/qpid/java/broker-plugins/jdbc-provider-bone/src/main/java/org/apache/qpid/server/store/jdbc/bonecp/BoneCPConnectionProviderFactory.java b/qpid/java/broker-plugins/jdbc-provider-bone/src/main/java/org/apache/qpid/server/store/jdbc/bonecp/BoneCPConnectionProviderFactory.java
index b95ad1166c..f12e7f35e6 100644
--- a/qpid/java/broker-plugins/jdbc-provider-bone/src/main/java/org/apache/qpid/server/store/jdbc/bonecp/BoneCPConnectionProviderFactory.java
+++ b/qpid/java/broker-plugins/jdbc-provider-bone/src/main/java/org/apache/qpid/server/store/jdbc/bonecp/BoneCPConnectionProviderFactory.java
@@ -21,6 +21,7 @@
package org.apache.qpid.server.store.jdbc.bonecp;
import java.sql.SQLException;
+
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.plugin.JDBCConnectionProviderFactory;
import org.apache.qpid.server.store.jdbc.ConnectionProvider;
@@ -34,9 +35,9 @@ public class BoneCPConnectionProviderFactory implements JDBCConnectionProviderFa
}
@Override
- public ConnectionProvider getConnectionProvider(String connectionUrl, VirtualHost virtualHost)
+ public ConnectionProvider getConnectionProvider(String connectionUrl, VirtualHost virtualHost, boolean configStoreOnly)
throws SQLException
{
- return new BoneCPConnectionProvider(connectionUrl, virtualHost);
+ return new BoneCPConnectionProvider(connectionUrl, virtualHost, configStoreOnly);
}
}
diff --git a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/DefaultConnectionProviderFactory.java b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/DefaultConnectionProviderFactory.java
index 8fc7de12d0..cc7fda4c82 100644
--- a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/DefaultConnectionProviderFactory.java
+++ b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/DefaultConnectionProviderFactory.java
@@ -34,7 +34,7 @@ public class DefaultConnectionProviderFactory implements JDBCConnectionProviderF
@Override
public ConnectionProvider getConnectionProvider(String connectionUrl,
- VirtualHost virtualHost)
+ VirtualHost virtualHost, boolean configStoreOnly)
{
return new DefaultConnectionProvider(connectionUrl);
}
diff --git a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java
index 621759ef85..d8fb124c69 100644
--- a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java
+++ b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java
@@ -29,6 +29,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
+
import org.apache.log4j.Logger;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.plugin.JDBCConnectionProviderFactory;
@@ -37,6 +38,7 @@ import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreException;
import org.apache.qpid.server.store.StoreFuture;
import org.apache.qpid.server.store.Transaction;
+import org.apache.qpid.server.util.MapValueConverter;
/**
* An implementation of a {@link org.apache.qpid.server.store.MessageStore} that uses a JDBC database as the persistence
@@ -48,10 +50,14 @@ public class JDBCMessageStore extends AbstractJDBCMessageStore implements Messag
private static final Logger _logger = Logger.getLogger(JDBCMessageStore.class);
-
public static final String TYPE = "JDBC";
public static final String CONNECTION_URL = "connectionURL";
public static final String CONFIG_CONNECTION_URL = "configConnectionURL";
+ public static final String CONNECTION_POOL = "connectionPool";
+ public static final String JDBC_BIG_INT_TYPE = "jdbcBigIntType";
+ public static final String JDBC_BYTES_FOR_BLOB = "jdbcBytesForBlob";
+ public static final String JDBC_VARBINARY_TYPE = "jdbcVarbinaryType";
+ public static final String JDBC_BLOB_TYPE = "jdbcBlobType";
protected String _connectionURL;
private ConnectionProvider _connectionProvider;
@@ -280,19 +286,24 @@ public class JDBCMessageStore extends AbstractJDBCMessageStore implements Messag
VirtualHost virtualHost)
throws ClassNotFoundException, SQLException
{
+ Map<String, Object> messageStoreSettings = virtualHost.getMessageStoreSettings();
String connectionURL;
- if(!isConfigStoreOnly())
+ Object poolAttribute = null;
+ boolean configStoreOnly = isConfigStoreOnly();
+ if(!configStoreOnly)
{
- connectionURL = virtualHost.getAttribute(CONNECTION_URL) == null
- ? String.valueOf(virtualHost.getAttribute(VirtualHost.STORE_PATH))
- : String.valueOf(virtualHost.getAttribute(CONNECTION_URL));
+ connectionURL = messageStoreSettings.get(CONNECTION_URL) == null
+ ? String.valueOf(messageStoreSettings.get(MessageStore.STORE_PATH))
+ : String.valueOf(messageStoreSettings.get(CONNECTION_URL));
+ poolAttribute = messageStoreSettings.get(CONNECTION_POOL);
}
else
{
connectionURL = virtualHost.getAttribute(CONFIG_CONNECTION_URL) == null
? String.valueOf(virtualHost.getAttribute(VirtualHost.CONFIG_STORE_PATH))
: String.valueOf(virtualHost.getAttribute(CONFIG_CONNECTION_URL));
+ poolAttribute = virtualHost.getAttribute(CONNECTION_POOL);
}
JDBCDetails details = null;
@@ -312,8 +323,6 @@ public class JDBCMessageStore extends AbstractJDBCMessageStore implements Messag
details = DERBY_DETAILS;
}
-
- Object poolAttribute = virtualHost.getAttribute("connectionPool");
String connectionPoolType = poolAttribute == null ? "DEFAULT" : String.valueOf(poolAttribute);
JDBCConnectionProviderFactory connectionProviderFactory =
@@ -324,12 +333,22 @@ public class JDBCMessageStore extends AbstractJDBCMessageStore implements Messag
connectionProviderFactory = new DefaultConnectionProviderFactory();
}
- _connectionProvider = connectionProviderFactory.getConnectionProvider(connectionURL, virtualHost);
+ _connectionProvider = connectionProviderFactory.getConnectionProvider(connectionURL, virtualHost, configStoreOnly);
- _blobType = getStringAttribute(virtualHost, "jdbcBlobType",details.getBlobType());
- _varBinaryType = getStringAttribute(virtualHost, "jdbcVarbinaryType",details.getVarBinaryType());
- _useBytesMethodsForBlob = getBooleanAttribute(virtualHost, "jdbcBytesForBlob",details.isUseBytesMethodsForBlob());
- _bigIntType = getStringAttribute(virtualHost, "jdbcBigIntType", details.getBigintType());
+ if(!configStoreOnly)
+ {
+ _blobType = MapValueConverter.getStringAttribute(JDBC_BLOB_TYPE, messageStoreSettings, details.getBlobType());
+ _varBinaryType = MapValueConverter.getStringAttribute(JDBC_VARBINARY_TYPE, messageStoreSettings, details.getVarBinaryType());
+ _useBytesMethodsForBlob = MapValueConverter.getBooleanAttribute(JDBC_BYTES_FOR_BLOB, messageStoreSettings, details.isUseBytesMethodsForBlob());
+ _bigIntType = MapValueConverter.getStringAttribute(JDBC_BIG_INT_TYPE, messageStoreSettings, details.getBigintType());
+ }
+ else
+ {
+ _blobType = getStringAttribute(virtualHost, JDBC_BLOB_TYPE,details.getBlobType());
+ _varBinaryType = getStringAttribute(virtualHost, JDBC_VARBINARY_TYPE,details.getVarBinaryType());
+ _useBytesMethodsForBlob = getBooleanAttribute(virtualHost, JDBC_BYTES_FOR_BLOB,details.isUseBytesMethodsForBlob());
+ _bigIntType = getStringAttribute(virtualHost, JDBC_BIG_INT_TYPE, details.getBigintType());
+ }
}
diff --git a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreFactory.java b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreFactory.java
index d22fc21b74..0a8f682f16 100644
--- a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreFactory.java
+++ b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreFactory.java
@@ -52,18 +52,17 @@ public class JDBCMessageStoreFactory implements MessageStoreFactory, DurableConf
@Override
public void validateAttributes(Map<String, Object> attributes)
{
- if(getType().equals(attributes.get(VirtualHost.STORE_TYPE)))
+ @SuppressWarnings("unchecked")
+ Map<String, Object> messageStoreSettings = (Map<String, Object>) attributes.get(VirtualHost.MESSAGE_STORE_SETTINGS);
+
+ if(getType().equals(messageStoreSettings.get(MessageStore.STORE_TYPE)))
{
- Object connectionURL = attributes.get(JDBCMessageStore.CONNECTION_URL);
+ Object connectionURL = messageStoreSettings.get(JDBCMessageStore.CONNECTION_URL);
if(!(connectionURL instanceof String))
{
- Object storePath = attributes.get(VirtualHost.STORE_PATH);
- if(!(storePath instanceof String))
- {
- throw new IllegalArgumentException("Attribute '"+ JDBCMessageStore.CONNECTION_URL
- +"' is required and must be of type String.");
+ throw new IllegalArgumentException("Setting '"+ JDBCMessageStore.CONNECTION_URL
+ +"' is required and must be of type String.");
- }
}
}
if(getType().equals(attributes.get(VirtualHost.CONFIG_STORE_TYPE)))
diff --git a/qpid/java/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java b/qpid/java/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java
index 9c348383c6..65bf795045 100644
--- a/qpid/java/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java
+++ b/qpid/java/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java
@@ -20,12 +20,15 @@
*/
package org.apache.qpid.server.store.jdbc;
+import java.io.File;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
+import java.util.HashMap;
import java.util.HashSet;
+import java.util.Map;
import java.util.Set;
import org.apache.qpid.server.model.VirtualHost;
@@ -66,10 +69,12 @@ public class JDBCMessageStoreTest extends MessageStoreTestCase
protected void setUpStoreConfiguration(VirtualHost virtualHost) throws Exception
{
_connectionURL = "jdbc:derby:memory:/" + getTestName() + ";create=true";
-
- when(virtualHost.getAttribute(eq("connectionURL"))).thenReturn(_connectionURL);
+ Map<String, Object> messageStoreSettings = new HashMap<String, Object>();
+ messageStoreSettings.put(JDBCMessageStore.CONNECTION_URL, _connectionURL);
+ when(virtualHost.getMessageStoreSettings()).thenReturn(messageStoreSettings);
}
+
@Override
protected MessageStore createMessageStore()
{
diff --git a/qpid/java/systests/etc/config-systests.json b/qpid/java/systests/etc/config-systests.json
index 60f3f7f174..dac8ee4fd4 100644
--- a/qpid/java/systests/etc/config-systests.json
+++ b/qpid/java/systests/etc/config-systests.json
@@ -22,7 +22,7 @@
"name": "Broker",
"defaultVirtualHost" : "test",
"storeVersion": 1,
- "modelVersion": "1.3",
+ "modelVersion": "1.4",
"authenticationproviders" : [ {
"name" : "plain",
"type" : "PlainPasswordFile",
@@ -60,7 +60,9 @@
"virtualhosts" : [ {
"name" : "test",
"type" : "STANDARD",
- "storeType": "${messagestore.type}",
- "storePath" : "${QPID_WORK}/test"
+ "messageStoreSettings": {
+ "storeType": "${messagestore.type}",
+ "storePath" : "${QPID_WORK}/test/${test.port}"
+ }
} ]
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java
index 9dc981a358..36e86fbe7b 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java
@@ -115,9 +115,12 @@ public class MessageStoreTest extends QpidTestCase
String hostName = getName();
_storePath = System.getProperty("QPID_WORK", TMP_FOLDER + File.separator + getTestName()) + File.separator + hostName;
+ Map<String, Object> messageStoreSettings = new HashMap<String, Object>();
+ messageStoreSettings.put(MessageStore.STORE_PATH, _storePath);
+ messageStoreSettings.put(MessageStore.STORE_TYPE, getTestProfileMessageStoreType());
+
_virtualHostModel = mock(org.apache.qpid.server.model.VirtualHost.class);
- when(_virtualHostModel.getAttribute(eq(org.apache.qpid.server.model.VirtualHost.STORE_PATH))).thenReturn(_storePath);
- when(_virtualHostModel.getAttribute(eq(org.apache.qpid.server.model.VirtualHost.STORE_TYPE))).thenReturn(getTestProfileMessageStoreType());
+ when(_virtualHostModel.getMessageStoreSettings()).thenReturn(messageStoreSettings);
when(_virtualHostModel.getAttribute(eq(org.apache.qpid.server.model.VirtualHost.TYPE))).thenReturn(StandardVirtualHostFactory.TYPE);
when(_virtualHostModel.getAttribute(eq(org.apache.qpid.server.model.VirtualHost.NAME))).thenReturn(hostName);
when(_virtualHostModel.getType()).thenReturn(StandardVirtualHostFactory.TYPE);
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java
index 9bc1a57261..7017ea6d45 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.store;
+import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@@ -50,14 +51,15 @@ public class
@Override
public void configureConfigStore(VirtualHost virtualHost, ConfigurationRecoveryHandler recoveryHandler)
{
- Object overfullAttr = virtualHost.getAttribute(MessageStoreConstants.OVERFULL_SIZE_ATTRIBUTE);
+ Map<String, Object> messageStoreSettings = virtualHost.getMessageStoreSettings();
+ Object overfullAttr = messageStoreSettings.get(MessageStore.OVERFULL_SIZE);
_persistentSizeHighThreshold = overfullAttr == null
? Long.MAX_VALUE
: overfullAttr instanceof Number
? ((Number)overfullAttr).longValue()
: Long.parseLong(overfullAttr.toString());
- Object underfullAttr = virtualHost.getAttribute(MessageStoreConstants.UNDERFULL_SIZE_ATTRIBUTE);
+ Object underfullAttr = messageStoreSettings.get(MessageStore.UNDERFULL_SIZE);
_persistentSizeLowThreshold = overfullAttr == null
? _persistentSizeHighThreshold
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
index 6b03151f29..cc3e7574b8 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
@@ -21,54 +21,71 @@
package org.apache.qpid.server.store;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.log4j.Logger;
import org.apache.qpid.server.message.EnqueueableMessage;
-import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.plugin.MessageStoreFactory;
-import java.nio.ByteBuffer;
-import java.util.HashMap;
-
public class SlowMessageStore implements MessageStore, DurableConfigurationStore
{
private static final Logger _logger = Logger.getLogger(SlowMessageStore.class);
+
+ public static final String TYPE = "SLOW";
+ public static final String DELAYS = "delays";
+ public static final String REAL_STORE = "realStore";
+
+ private static final String DEFAULT_DELAY = "default";
+ private static final String PRE = "pre";
+ private static final String POST = "post";
+
private HashMap<String, Long> _preDelays = new HashMap<String, Long>();
private HashMap<String, Long> _postDelays = new HashMap<String, Long>();
private long _defaultDelay = 0L;
private MessageStore _realStore = null;
private DurableConfigurationStore _durableConfigurationStore = null;
- private static final String PRE = "pre";
- private static final String POST = "post";
- public static final String TYPE = "SLOW";
- private String DEFAULT_DELAY = "default";
+
+ private Map<EventListener, Event[]> _eventListeners = new ConcurrentHashMap<EventListener, Event[]>();
// ***** MessageStore Interface.
+ @Override
public void configureConfigStore(VirtualHost virtualHost, ConfigurationRecoveryHandler recoveryHandler)
{
_logger.info("Starting SlowMessageStore on Virtualhost:" + virtualHost.getName());
- Object delaysAttr = virtualHost.getAttribute("slowMessageStoreDelays");
+ Map<String, Object> messageStoreSettings = virtualHost.getMessageStoreSettings();
+ Object delaysAttr = messageStoreSettings.get(DELAYS);
@SuppressWarnings({ "unchecked" })
Map<String,Object> delays = (delaysAttr instanceof Map) ? (Map<String,Object>) delaysAttr : Collections.<String,Object>emptyMap();
configureDelays(delays);
- final Object realStoreAttr = virtualHost.getAttribute("realStore");
+ final Object realStoreAttr = messageStoreSettings.get(REAL_STORE);
String messageStoreType = realStoreAttr == null ? MemoryMessageStore.TYPE : realStoreAttr.toString();
-
if (delays.containsKey(DEFAULT_DELAY))
{
_defaultDelay = Long.parseLong(String.valueOf(delays.get(DEFAULT_DELAY)));
}
_realStore = MessageStoreFactory.FACTORY_LOADER.get(messageStoreType).createMessageStore();
-
+
+ if (!_eventListeners.isEmpty())
+ {
+ for (Iterator<Map.Entry<EventListener, Event[]>> it = _eventListeners.entrySet().iterator(); it.hasNext();)
+ {
+ Map.Entry<EventListener, Event[]> entry = it.next();
+ _realStore.addEventListener(entry.getKey(), entry.getValue());
+ it.remove();
+ }
+ }
+
if (_realStore instanceof DurableConfigurationStore)
{
_durableConfigurationStore = (DurableConfigurationStore)_realStore;
@@ -141,13 +158,14 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore
}
}
-
+ @Override
public void configureMessageStore(VirtualHost virtualHost, MessageStoreRecoveryHandler messageRecoveryHandler,
TransactionLogRecoveryHandler tlogRecoveryHandler)
{
_realStore.configureMessageStore(virtualHost, messageRecoveryHandler, tlogRecoveryHandler);
}
+ @Override
public void close()
{
doPreDelay("close");
@@ -155,12 +173,12 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore
doPostDelay("close");
}
+ @Override
public <M extends StorableMessageMetaData> StoredMessage<M> addMessage(M metaData)
{
return _realStore.addMessage(metaData);
}
-
@Override
public void create(UUID id, String type, Map<String, Object> attributes) throws StoreException
{
@@ -210,6 +228,7 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore
doPostDelay("update");
}
+ @Override
public Transaction newTransaction()
{
doPreDelay("beginTran");
@@ -218,27 +237,12 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore
return txn;
}
-
+ @Override
public boolean isPersistent()
{
return _realStore.isPersistent();
}
- public void storeMessageHeader(Long messageNumber, ServerMessage message)
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public void storeContent(Long messageNumber, long offset, ByteBuffer body)
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public ServerMessage getMessage(Long messageNumber)
- {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
private class SlowTransaction implements Transaction
{
private final Transaction _underlying;
@@ -248,6 +252,7 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore
_underlying = underlying;
}
+ @Override
public void enqueueMessage(TransactionLogResource queue, EnqueueableMessage message)
{
doPreDelay("enqueueMessage");
@@ -255,6 +260,7 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore
doPostDelay("enqueueMessage");
}
+ @Override
public void dequeueMessage(TransactionLogResource queue, EnqueueableMessage message)
{
doPreDelay("dequeueMessage");
@@ -262,6 +268,7 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore
doPostDelay("dequeueMessage");
}
+ @Override
public void commitTran()
{
doPreDelay("commitTran");
@@ -269,6 +276,7 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore
doPostDelay("commitTran");
}
+ @Override
public StoreFuture commitTranAsync()
{
doPreDelay("commitTran");
@@ -277,6 +285,7 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore
return future;
}
+ @Override
public void abortTran()
{
doPreDelay("abortTran");
@@ -284,11 +293,13 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore
doPostDelay("abortTran");
}
+ @Override
public void removeXid(long format, byte[] globalId, byte[] branchId)
{
_underlying.removeXid(format, globalId, branchId);
}
+ @Override
public void recordXid(long format, byte[] globalId, byte[] branchId, Record[] enqueues, Record[] dequeues)
{
_underlying.recordXid(format, globalId, branchId, enqueues, dequeues);
@@ -304,7 +315,14 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore
@Override
public void addEventListener(EventListener eventListener, Event... events)
{
- _realStore.addEventListener(eventListener, events);
+ if (_realStore == null)
+ {
+ _eventListeners .put(eventListener, events);
+ }
+ else
+ {
+ _realStore.addEventListener(eventListener, events);
+ }
}
@Override
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/StoreOverfullTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/StoreOverfullTest.java
index eca91c2b5e..3dea6fd5bb 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/StoreOverfullTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/StoreOverfullTest.java
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.server.store;
+import java.util.HashMap;
+import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
@@ -62,10 +64,13 @@ public class StoreOverfullTest extends QpidBrokerTestCase
public void setUp() throws Exception
{
+ Map<String, Object> messageStoreSettings = new HashMap<String, Object>();
+ messageStoreSettings.put(MessageStore.STORE_TYPE, QuotaMessageStore.TYPE);
+ messageStoreSettings.put(MessageStore.OVERFULL_SIZE, OVERFULL_SIZE);
+ messageStoreSettings.put(MessageStore.UNDERFULL_SIZE, UNDERFULL_SIZE);
+
TestBrokerConfiguration config = getBrokerConfiguration();
- config.setObjectAttribute(TestBrokerConfiguration.ENTRY_NAME_VIRTUAL_HOST, VirtualHost.STORE_TYPE, QuotaMessageStore.TYPE);
- config.setObjectAttribute(TestBrokerConfiguration.ENTRY_NAME_VIRTUAL_HOST, MessageStoreConstants.OVERFULL_SIZE_ATTRIBUTE, OVERFULL_SIZE);
- config.setObjectAttribute(TestBrokerConfiguration.ENTRY_NAME_VIRTUAL_HOST, MessageStoreConstants.UNDERFULL_SIZE_ATTRIBUTE, UNDERFULL_SIZE);
+ config.setObjectAttribute(TestBrokerConfiguration.ENTRY_NAME_VIRTUAL_HOST, VirtualHost.MESSAGE_STORE_SETTINGS, messageStoreSettings);
super.setUp();
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/Asserts.java b/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/Asserts.java
index 18774941e8..0d76f6c444 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/Asserts.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/Asserts.java
@@ -31,7 +31,18 @@ import java.util.Map;
import javax.jms.JMSException;
import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.server.model.*;
+import org.apache.qpid.server.model.Binding;
+import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.Connection;
+import org.apache.qpid.server.model.Exchange;
+import org.apache.qpid.server.model.ExclusivityPolicy;
+import org.apache.qpid.server.model.LifetimePolicy;
+import org.apache.qpid.server.model.Port;
+import org.apache.qpid.server.model.Protocol;
+import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.model.State;
+import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.model.adapter.AbstractConfiguredObject;
import org.apache.qpid.test.utils.TestBrokerConfiguration;
@@ -50,10 +61,10 @@ public class Asserts
ConfiguredObject.LAST_UPDATED_TIME,
ConfiguredObject.DESCRIPTION,
VirtualHost.SUPPORTED_QUEUE_TYPES,
- VirtualHost.STORE_PATH,
VirtualHost.TYPE,
VirtualHost.CONFIG_STORE_PATH,
VirtualHost.CONFIG_STORE_TYPE,
+ VirtualHost.CONFIGURATION_STORE_SETTINGS,
VirtualHost.SECURITY_ACL);
assertEquals("Unexpected value of attribute " + VirtualHost.NAME,
@@ -146,7 +157,7 @@ public class Asserts
@SuppressWarnings("unchecked")
Map<String, Object> statistics = (Map<String, Object>) queueData.get(Asserts.STATISTICS_ATTRIBUTE);
- Asserts.assertAttributesPresent(statistics,
+ Asserts.assertAttributesPresent(statistics,
"bindingCount",
"consumerCount",
"consumerCountWithCredit",
@@ -226,7 +237,7 @@ public class Asserts
@SuppressWarnings("unchecked")
Map<String, Object> statistics = (Map<String, Object>) connectionData.get(STATISTICS_ATTRIBUTE);
-
+
assertAttributesPresent(statistics,
"bytesIn",
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/RestTestHelper.java b/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/RestTestHelper.java
index 49e07e92e8..7050dcfc33 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/RestTestHelper.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/RestTestHelper.java
@@ -524,7 +524,7 @@ public class RestTestHelper
String queueName = EXPECTED_QUEUES[i];
Map<String, Object> queueData = new HashMap<String, Object>();
queueData.put(Queue.NAME, queueName);
- queueData.put(Queue.DURABLE, Boolean.TRUE);
+ queueData.put(Queue.DURABLE, Boolean.FALSE);
submitRequest("/rest/queue/test/" + queueName, "PUT", queueData);
Map<String, Object> bindingData = new HashMap<String, Object>();
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java
index 6bc515dcef..1ae1be3101 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java
@@ -35,6 +35,7 @@ import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.queue.ConflationQueue;
+import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.virtualhost.StandardVirtualHostFactory;
import org.apache.qpid.util.FileUtils;
import org.codehaus.jackson.JsonGenerationException;
@@ -111,7 +112,10 @@ public class VirtualHostRestTest extends QpidRestTestCase
restartBroker();
Map<String, Object> hostDetails = getRestTestHelper().getJsonAsSingletonList("/rest/virtualhost/" + hostName);
Asserts.assertVirtualHost(hostName, hostDetails);
- assertEquals("Unexpected store type", storeType, hostDetails.get(VirtualHost.STORE_TYPE));
+
+ @SuppressWarnings("unchecked")
+ Map<String, Object> messageStoreSettings = (Map<String, Object>) hostDetails.get(VirtualHost.MESSAGE_STORE_SETTINGS);
+ assertEquals("Unexpected store type", storeType, messageStoreSettings.get(MessageStore.STORE_TYPE));
assertNewVirtualHost(hostDetails);
}
@@ -155,23 +159,30 @@ public class VirtualHostRestTest extends QpidRestTestCase
String hostToUpdate = TEST3_VIRTUALHOST;
Map<String, Object> hostDetails = getRestTestHelper().getJsonAsSingletonList("/rest/virtualhost/" + hostToUpdate);
Asserts.assertVirtualHost(hostToUpdate, hostDetails);
- String configPath = (String)hostDetails.get(VirtualHost.STORE_PATH);
+ @SuppressWarnings("unchecked")
+ Map<String, Object> attributes = (Map<String, Object>)hostDetails.get(VirtualHost.MESSAGE_STORE_SETTINGS);
+ String configPath = (String) attributes.get(MessageStore.STORE_PATH);
String storeType = getTestProfileMessageStoreType();
String storeLocation = getStoreLocation(hostToUpdate);
+ Map<String, Object> newMessageStoreSettings = new HashMap<String, Object>();
+ newMessageStoreSettings.put(MessageStore.STORE_TYPE, storeType);
+ newMessageStoreSettings.put(MessageStore.STORE_PATH, storeLocation);
+
Map<String, Object> newAttributes = new HashMap<String, Object>();
newAttributes.put(VirtualHost.NAME, hostToUpdate);
- newAttributes.put(VirtualHost.STORE_TYPE, storeType);
- newAttributes.put(VirtualHost.STORE_PATH, storeLocation);
+ newAttributes.put(VirtualHost.MESSAGE_STORE_SETTINGS, newMessageStoreSettings);
int response = getRestTestHelper().submitRequest("/rest/virtualhost/" + hostToUpdate, "PUT", newAttributes);
assertEquals("Unexpected response code", 409, response);
restartBroker();
- hostDetails = getRestTestHelper().getJsonAsSingletonList("/rest/virtualhost/" + hostToUpdate);
- Asserts.assertVirtualHost(hostToUpdate, hostDetails);
- assertEquals("Unexpected config path", configPath, hostDetails.get(VirtualHost.STORE_PATH));
+ Map<String, Object> rereadHostDetails = getRestTestHelper().getJsonAsSingletonList("/rest/virtualhost/" + hostToUpdate);
+ Asserts.assertVirtualHost(hostToUpdate, rereadHostDetails);
+ @SuppressWarnings("unchecked")
+ Map<String, Object> rereadMessageStoreSettings = (Map<String,Object>)rereadHostDetails.get(VirtualHost.MESSAGE_STORE_SETTINGS);
+ assertEquals("Unexpected config path", configPath, rereadMessageStoreSettings.get(MessageStore.STORE_PATH));
}
public void testPutCreateQueue() throws Exception
@@ -524,12 +535,15 @@ public class VirtualHostRestTest extends QpidRestTestCase
private int tryCreateVirtualHost(String hostName, String storeType, String storePath, String configPath) throws IOException,
JsonGenerationException, JsonMappingException
{
+ Map<String, Object> messageStoreSettings = new HashMap<String, Object>();
+ messageStoreSettings.put(MessageStore.STORE_PATH, storePath);
+ messageStoreSettings.put(MessageStore.STORE_TYPE, storeType);
+
Map<String, Object> hostData = new HashMap<String, Object>();
hostData.put(VirtualHost.NAME, hostName);
hostData.put(VirtualHost.TYPE, StandardVirtualHostFactory.TYPE);
- hostData.put(VirtualHost.STORE_PATH, storePath);
- hostData.put(VirtualHost.STORE_TYPE, storeType);
+ hostData.put(VirtualHost.MESSAGE_STORE_SETTINGS, messageStoreSettings);
return getRestTestHelper().submitRequest("/rest/virtualhost/" + hostName, "PUT", hostData);
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/acl/BrokerACLTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/acl/BrokerACLTest.java
index b2119ff79f..ec389e55f1 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/acl/BrokerACLTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/acl/BrokerACLTest.java
@@ -42,6 +42,7 @@ import org.apache.qpid.server.security.acl.AbstractACLTestCase;
import org.apache.qpid.server.security.auth.manager.AnonymousAuthenticationManagerFactory;
import org.apache.qpid.server.security.auth.manager.PlainPasswordFileAuthenticationManagerFactory;
import org.apache.qpid.server.security.group.FileGroupManagerFactory;
+import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.virtualhost.StandardVirtualHostFactory;
import org.apache.qpid.systest.rest.QpidRestTestCase;
import org.apache.qpid.test.utils.TestBrokerConfiguration;
@@ -976,10 +977,13 @@ public class BrokerACLTest extends QpidRestTestCase
private int createHost(String hostName) throws Exception
{
+ Map<String, Object> messageStoreSettings = new HashMap<String, Object>();
+ messageStoreSettings.put(MessageStore.STORE_PATH, getStoreLocation(hostName));
+ messageStoreSettings.put(MessageStore.STORE_TYPE, getTestProfileMessageStoreType());
+
Map<String, Object> hostData = new HashMap<String, Object>();
hostData.put(VirtualHost.NAME, hostName);
- hostData.put(VirtualHost.STORE_PATH, getStoreLocation(hostName));
- hostData.put(VirtualHost.STORE_TYPE, getTestProfileMessageStoreType());
+ hostData.put(VirtualHost.MESSAGE_STORE_SETTINGS, messageStoreSettings);
hostData.put(VirtualHost.TYPE, StandardVirtualHostFactory.TYPE);
return getRestTestHelper().submitRequest("/rest/virtualhost/" + hostName, "PUT", hostData);
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitDelayTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitDelayTest.java
index 85e62f8dae..4026b7a6cb 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitDelayTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitDelayTest.java
@@ -23,13 +23,6 @@ package org.apache.qpid.test.client.timeouts;
import java.util.HashMap;
import java.util.Map;
-import org.apache.qpid.server.model.VirtualHost;
-import org.apache.qpid.server.store.SlowMessageStore;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
-import org.apache.qpid.test.utils.TestBrokerConfiguration;
-
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
@@ -38,6 +31,14 @@ import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
+import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.SlowMessageStore;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.apache.qpid.test.utils.TestBrokerConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/**
* This tests that when the commit takes a long time(due to POST_COMMIT_DELAY) that the commit does not timeout
* This test must be run in conjunction with SyncWaiteTimeoutDelay or be run with POST_COMMIT_DELAY > 30s to ensure
@@ -59,9 +60,14 @@ public class SyncWaitDelayTest extends QpidBrokerTestCase
{
Map<String, Object> slowMessageStoreDelays = new HashMap<String,Object>();
slowMessageStoreDelays.put("postcommitTran", POST_COMMIT_DELAY);
+
+ Map<String, Object> messageStoreSettings = new HashMap<String, Object>();
+ messageStoreSettings.put(MessageStore.STORE_TYPE, SlowMessageStore.TYPE);
+ messageStoreSettings.put(SlowMessageStore.DELAYS, slowMessageStoreDelays);
+
TestBrokerConfiguration config = getBrokerConfiguration();
- config.setObjectAttribute(TestBrokerConfiguration.ENTRY_NAME_VIRTUAL_HOST, VirtualHost.STORE_TYPE, SlowMessageStore.TYPE);
- config.setObjectAttribute(TestBrokerConfiguration.ENTRY_NAME_VIRTUAL_HOST, "slowMessageStoreDelays", slowMessageStoreDelays);
+ config.setObjectAttribute(TestBrokerConfiguration.ENTRY_NAME_VIRTUAL_HOST, VirtualHost.MESSAGE_STORE_SETTINGS, messageStoreSettings);
+
super.setUp();
//Set the syncWrite timeout to be just larger than the delay on the commitTran.
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
index ace34506bd..93cf90829d 100755
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
@@ -64,6 +64,8 @@ import org.apache.qpid.server.configuration.BrokerProperties;
import org.apache.qpid.server.model.Port;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.protocol.AmqpProtocolVersion;
+import org.apache.qpid.server.store.MemoryMessageStore;
+import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.virtualhost.StandardVirtualHostFactory;
import org.apache.qpid.url.URLSyntaxException;
import org.apache.qpid.util.FileUtils;
@@ -233,7 +235,16 @@ public class QpidBrokerTestCase extends QpidTestCase
configuration.setObjectAttribute(TestBrokerConfiguration.ENTRY_NAME_AMQP_PORT, Port.PORT, actualPort);
configuration.setObjectAttribute(TestBrokerConfiguration.ENTRY_NAME_RMI_PORT, Port.PORT, getManagementPort(actualPort));
configuration.setObjectAttribute(TestBrokerConfiguration.ENTRY_NAME_JMX_PORT, Port.PORT, getManagementPort(actualPort) + JMXPORT_CONNECTORSERVER_OFFSET);
+
+ String workDir = System.getProperty("QPID_WORK") + File.separator + TestBrokerConfiguration.ENTRY_NAME_VIRTUAL_HOST + File.separator + actualPort;
+ Map<String, Object> virtualHostSettings = configuration.getObjectAttributes(TestBrokerConfiguration.ENTRY_NAME_VIRTUAL_HOST);
+
+ @SuppressWarnings("unchecked")
+ Map<String, Object> storeSettings = (Map<String, Object>)virtualHostSettings.get(VirtualHost.MESSAGE_STORE_SETTINGS);
+ storeSettings.put(MessageStore.STORE_PATH, workDir);
+ configuration.setObjectAttribute(TestBrokerConfiguration.ENTRY_NAME_VIRTUAL_HOST, VirtualHost.MESSAGE_STORE_SETTINGS, storeSettings);
}
+
return configuration;
}
@@ -433,10 +444,10 @@ public class QpidBrokerTestCase extends QpidTestCase
}
Set<Integer> portsUsedByBroker = guessAllPortsUsedByBroker(port);
+ String testConfig = saveTestConfiguration(port, testConfiguration);
if (_brokerType.equals(BrokerType.INTERNAL) && !existingInternalBroker())
{
- String testConfig = saveTestConfiguration(port, testConfiguration);
setSystemProperty(BrokerProperties.PROPERTY_USE_CUSTOM_RMI_SOCKET_FACTORY, "false");
BrokerOptions options = new BrokerOptions();
@@ -460,9 +471,6 @@ public class QpidBrokerTestCase extends QpidTestCase
}
else if (!_brokerType.equals(BrokerType.EXTERNAL))
{
- String workDir = System.getProperty("QPID_WORK") + File.separator + "work" + File.separator + port;
- testConfiguration.setObjectAttribute(TestBrokerConfiguration.ENTRY_NAME_VIRTUAL_HOST, VirtualHost.STORE_PATH, workDir);
- String testConfig = saveTestConfiguration(port, testConfiguration);
// Add the port to QPID_WORK to ensure unique working dirs for multi broker tests
final String qpidWork = getQpidWork(_brokerType, port);
@@ -834,7 +842,7 @@ public class QpidBrokerTestCase extends QpidTestCase
{
storeDir = ":memory:";
}
- else if (!"Memory".equals(storeType))
+ else if (!MemoryMessageStore.TYPE.equals(storeType))
{
storeDir = "${QPID_WORK}" + File.separator + virtualHostName + File.separator + brokerPort;
}
@@ -843,8 +851,10 @@ public class QpidBrokerTestCase extends QpidTestCase
Map<String, Object> attributes = new HashMap<String, Object>();
attributes.put(VirtualHost.NAME, virtualHostName);
attributes.put(VirtualHost.TYPE, StandardVirtualHostFactory.TYPE);
- attributes.put(VirtualHost.STORE_TYPE, storeType);
- attributes.put(VirtualHost.STORE_PATH, storeDir);
+ Map<String, Object> messageStoreSettings = new HashMap<String, Object>();
+ messageStoreSettings.put(MessageStore.STORE_TYPE, storeType);
+ messageStoreSettings.put(MessageStore.STORE_PATH, storeDir);
+ attributes.put(VirtualHost.MESSAGE_STORE_SETTINGS, messageStoreSettings );
int port = getPort(brokerPort);
getBrokerConfiguration(port).addVirtualHostConfiguration(attributes);
}