diff options
author | Keith Wall <kwall@apache.org> | 2014-03-14 16:39:47 +0000 |
---|---|---|
committer | Keith Wall <kwall@apache.org> | 2014-03-14 16:39:47 +0000 |
commit | ec486999608568e37a55dc9c81d9be133d95ebc3 (patch) | |
tree | 87d6446e97cfdca321b1faff6f24a3010df4cdff | |
parent | db26915f9b2edfa410c094162bec78b9d2010b24 (diff) | |
download | qpid-python-ec486999608568e37a55dc9c81d9be133d95ebc3.tar.gz |
QPID-5624: Introduce messageStoreSettings VH attribute and move all message store related attributes into messageStoreSettings map
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-bdb-ha2@1577606 13f79535-47bb-0310-9956-ffa450edef68
54 files changed, 939 insertions, 366 deletions
diff --git a/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanProvider.java b/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanProvider.java index 16199d30a3..24d7513c5f 100644 --- a/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanProvider.java +++ b/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanProvider.java @@ -28,6 +28,7 @@ import org.apache.qpid.server.jmx.MBeanProvider; import org.apache.qpid.server.jmx.ManagedObject; import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.store.berkeleydb.BDBHAVirtualHostFactory; import org.apache.qpid.server.store.berkeleydb.BDBMessageStore; import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade; @@ -48,8 +49,7 @@ public class BDBHAMessageStoreManagerMBeanProvider implements MBeanProvider @Override public boolean isChildManageableByMBean(ConfiguredObject child) { - return (child instanceof VirtualHost - && ReplicatedEnvironmentFacade.TYPE.equals(child.getAttribute(VirtualHost.STORE_TYPE))); + return (child instanceof VirtualHost && BDBHAVirtualHostFactory.TYPE.equals(child.getType())); } @Override diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostFactory.java index b2ec96f9f8..6fb84b8a4d 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostFactory.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostFactory.java @@ -23,6 +23,8 @@ import java.util.Map; import org.apache.qpid.server.plugin.VirtualHostFactory; import org.apache.qpid.server.stats.StatisticsGatherer; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacadeFactory; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; @@ -52,11 +54,18 @@ public class BDBHAVirtualHostFactory implements VirtualHostFactory @Override public void validateAttributes(Map<String, Object> attributes) { - validateAttribute(org.apache.qpid.server.model.VirtualHost.STORE_PATH, String.class, attributes); - validateAttribute("haGroupName", String.class, attributes); - validateAttribute("haNodeName", String.class, attributes); - validateAttribute("haNodeAddress", String.class, attributes); - validateAttribute("haHelperAddress", String.class, attributes); + @SuppressWarnings("unchecked") + Map<String, Object> messageStoreSettings = (Map<String, Object>)attributes.get(org.apache.qpid.server.model.VirtualHost.MESSAGE_STORE_SETTINGS); + if (messageStoreSettings == null) + { + throw new IllegalArgumentException("Attribute '"+ org.apache.qpid.server.model.VirtualHost.MESSAGE_STORE_SETTINGS + "' is required."); + } + + validateAttribute(MessageStore.STORE_PATH, String.class, messageStoreSettings); + validateAttribute(ReplicatedEnvironmentFacadeFactory.GROUP_NAME, String.class, messageStoreSettings); + validateAttribute(ReplicatedEnvironmentFacadeFactory.NODE_NAME, String.class, messageStoreSettings); + validateAttribute(ReplicatedEnvironmentFacadeFactory.NODE_ADDRESS, String.class, messageStoreSettings); + validateAttribute(ReplicatedEnvironmentFacadeFactory.HELPER_ADDRESS, String.class, messageStoreSettings); } private void validateAttribute(String attrName, Class<?> clazz, Map<String, Object> attributes) diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java index 35dae4b800..c8550b2114 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java @@ -20,12 +20,6 @@ */ package org.apache.qpid.server.store.berkeleydb; -import com.sleepycat.bind.tuple.ByteBinding; -import com.sleepycat.bind.tuple.IntegerBinding; -import com.sleepycat.bind.tuple.LongBinding; -import com.sleepycat.je.*; -import com.sleepycat.je.Transaction; - import java.io.File; import java.lang.ref.SoftReference; import java.nio.ByteBuffer; @@ -36,16 +30,32 @@ import java.util.List; import java.util.Map; import java.util.Random; import java.util.UUID; -import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import org.apache.log4j.Logger; import org.apache.qpid.server.message.EnqueueableMessage; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.store.*; +import org.apache.qpid.server.store.ConfigurationRecoveryHandler; +import org.apache.qpid.server.store.ConfiguredObjectRecord; +import org.apache.qpid.server.store.DurableConfigurationStore; +import org.apache.qpid.server.store.Event; +import org.apache.qpid.server.store.EventListener; +import org.apache.qpid.server.store.EventManager; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.MessageStoreRecoveryHandler; import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler; +import org.apache.qpid.server.store.State; +import org.apache.qpid.server.store.StateManager; +import org.apache.qpid.server.store.StorableMessageMetaData; +import org.apache.qpid.server.store.StoreException; +import org.apache.qpid.server.store.StoreFuture; +import org.apache.qpid.server.store.StoredMemoryMessage; +import org.apache.qpid.server.store.StoredMessage; +import org.apache.qpid.server.store.TransactionLogRecoveryHandler; import org.apache.qpid.server.store.TransactionLogRecoveryHandler.QueueEntryRecoveryHandler; +import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.store.berkeleydb.entry.PreparedTransaction; import org.apache.qpid.server.store.berkeleydb.entry.QueueEntryKey; import org.apache.qpid.server.store.berkeleydb.entry.Xid; @@ -59,6 +69,21 @@ import org.apache.qpid.server.store.berkeleydb.tuple.XidBinding; import org.apache.qpid.server.store.berkeleydb.upgrade.Upgrader; import org.apache.qpid.util.FileUtils; +import com.sleepycat.bind.tuple.ByteBinding; +import com.sleepycat.bind.tuple.IntegerBinding; +import com.sleepycat.bind.tuple.LongBinding; +import com.sleepycat.je.CheckpointConfig; +import com.sleepycat.je.Cursor; +import com.sleepycat.je.Database; +import com.sleepycat.je.DatabaseConfig; +import com.sleepycat.je.DatabaseEntry; +import com.sleepycat.je.DatabaseException; +import com.sleepycat.je.EnvironmentConfig; +import com.sleepycat.je.LockConflictException; +import com.sleepycat.je.LockMode; +import com.sleepycat.je.OperationStatus; +import com.sleepycat.je.Transaction; + /** * BDBMessageStore implements a persistent {@link MessageStore} using the BDB high performance log. * @@ -72,8 +97,6 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore private static final Logger LOGGER = Logger.getLogger(BDBMessageStore.class); public static final int VERSION = 7; - public static final String ENVIRONMENT_CONFIGURATION = "bdbEnvironmentConfig"; - private static final int LOCK_RETRY_ATTEMPTS = 5; private static String CONFIGURED_OBJECTS_DB_NAME = "CONFIGURED_OBJECTS"; private static String MESSAGE_META_DATA_DB_NAME = "MESSAGE_METADATA"; @@ -119,7 +142,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore public BDBMessageStore(EnvironmentFacadeFactory environmentFacadeFactory) { - _type = environmentFacadeFactory.getType();; + _type = environmentFacadeFactory.getType(); _environmentFacadeFactory = environmentFacadeFactory; _stateManager = new StateManager(_eventManager); } @@ -218,8 +241,9 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore private void configure(VirtualHost virtualHost, boolean isMessageStore) throws StoreException { - Object overfullAttr = virtualHost.getAttribute(MessageStoreConstants.OVERFULL_SIZE_ATTRIBUTE); - Object underfullAttr = virtualHost.getAttribute(MessageStoreConstants.UNDERFULL_SIZE_ATTRIBUTE); + Map<String, Object> messageStoreSettings = virtualHost.getMessageStoreSettings(); + Object overfullAttr = messageStoreSettings.get(MessageStore.OVERFULL_SIZE); + Object underfullAttr = messageStoreSettings.get(MessageStore.UNDERFULL_SIZE); _persistentSizeHighThreshold = overfullAttr == null ? -1l : overfullAttr instanceof Number ? ((Number) overfullAttr).longValue() : Long.parseLong(overfullAttr.toString()); diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java index 8f2086a25c..e2b30f6740 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java @@ -52,12 +52,14 @@ public class BDBMessageStoreFactory implements MessageStoreFactory, DurableConfi @Override public void validateAttributes(Map<String, Object> attributes) { - if(getType().equals(attributes.get(VirtualHost.STORE_TYPE))) + @SuppressWarnings("unchecked") + Map<String, Object> messageStoreSettings = (Map<String, Object>) attributes.get(VirtualHost.MESSAGE_STORE_SETTINGS); + if(getType().equals(messageStoreSettings.get(MessageStore.STORE_TYPE))) { - Object storePath = attributes.get(VirtualHost.STORE_PATH); + Object storePath = messageStoreSettings.get(MessageStore.STORE_PATH); if(!(storePath instanceof String)) { - throw new IllegalArgumentException("Attribute '"+ VirtualHost.STORE_PATH + throw new IllegalArgumentException("Setting '"+ MessageStore.STORE_PATH +"' is required and must be of type String."); } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeFactory.java index b784e436b9..d242790efb 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeFactory.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeFactory.java @@ -24,8 +24,9 @@ import org.apache.qpid.server.model.VirtualHost; public interface EnvironmentFacadeFactory { + public static final String ENVIRONMENT_CONFIGURATION = "bdbEnvironmentConfig"; - EnvironmentFacade createEnvironmentFacade(VirtualHost virtualHost, boolean isMessageStore); + EnvironmentFacade createEnvironmentFacade(VirtualHost<?> virtualHost, boolean isMessageStore); String getType(); diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java index 384ceba98a..7fdae6b3ee 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java @@ -26,30 +26,32 @@ import java.util.Map; import org.apache.qpid.server.configuration.BrokerProperties; import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.store.MessageStore; public class StandardEnvironmentFacadeFactory implements EnvironmentFacadeFactory { @SuppressWarnings("unchecked") @Override - public EnvironmentFacade createEnvironmentFacade(VirtualHost virtualHost, boolean isMessageStore) + public EnvironmentFacade createEnvironmentFacade(VirtualHost<?> virtualHost, boolean isMessageStore) { + String name = virtualHost.getName(); + Map<String, Object> messageStoreSettings = virtualHost.getMessageStoreSettings(); Map<String, String> envConfigMap = new HashMap<String, String>(); envConfigMap.putAll(EnvironmentFacade.ENVCONFIG_DEFAULTS); - Object environmentConfigurationAttributes = virtualHost.getAttribute(BDBMessageStore.ENVIRONMENT_CONFIGURATION); + Object environmentConfigurationAttributes = messageStoreSettings.get(ENVIRONMENT_CONFIGURATION); if (environmentConfigurationAttributes instanceof Map) { envConfigMap.putAll((Map<String, String>) environmentConfigurationAttributes); } - String name = virtualHost.getName(); final String defaultPath = System.getProperty(BrokerProperties.PROPERTY_QPID_WORK) + File.separator + "bdbstore" + File.separator + name; String storeLocation; if(isMessageStore) { - storeLocation = (String) virtualHost.getAttribute(VirtualHost.STORE_PATH); + storeLocation = (String) messageStoreSettings.get(MessageStore.STORE_PATH); if(storeLocation == null) { storeLocation = defaultPath; diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java index cd53afe891..4df62b1d0f 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java @@ -23,6 +23,7 @@ package org.apache.qpid.server.store.berkeleydb.replication; import java.util.Map; import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.berkeleydb.EnvironmentFacade; import org.apache.qpid.server.store.berkeleydb.EnvironmentFacadeFactory; @@ -32,47 +33,56 @@ import com.sleepycat.je.Durability.SyncPolicy; public class ReplicatedEnvironmentFacadeFactory implements EnvironmentFacadeFactory { - + public static final String DURABILITY = "haDurability"; + public static final String GROUP_NAME = "haGroupName"; + public static final String HELPER_ADDRESS = "haHelperAddress"; + public static final String NODE_ADDRESS = "haNodeAddress"; + public static final String NODE_NAME = "haNodeName"; + public static final String REPLICATION_CONFIG = "haReplicationConfig"; + public static final String COALESCING_SYNC = "haCoalescingSync"; + public static final String DESIGNATED_PRIMARY = "haDesignatedPrimary"; + private static final int DEFAULT_NODE_PRIORITY = 1; private static final Durability DEFAULT_DURABILITY = new Durability(SyncPolicy.NO_SYNC, SyncPolicy.NO_SYNC, ReplicaAckPolicy.SIMPLE_MAJORITY); private static final boolean DEFAULT_COALESCING_SYNC = true; - - @Override - public EnvironmentFacade createEnvironmentFacade(final VirtualHost virtualHost, boolean isMessageStore) + public EnvironmentFacade createEnvironmentFacade(VirtualHost<?> virtualHost, boolean isMessageStore) { + final Map<String, Object> messageStoreSettings = virtualHost.getMessageStoreSettings(); ReplicatedEnvironmentConfiguration configuration = new ReplicatedEnvironmentConfiguration() { @Override public boolean isDesignatedPrimary() { - return convertBoolean(virtualHost.getAttribute("haDesignatedPrimary"), false); + return convertBoolean(messageStoreSettings.get(DESIGNATED_PRIMARY), false); } @Override public boolean isCoalescingSync() { - return convertBoolean(virtualHost.getAttribute("haCoalescingSync"), DEFAULT_COALESCING_SYNC); + return convertBoolean(messageStoreSettings.get(COALESCING_SYNC), DEFAULT_COALESCING_SYNC); } @Override public String getStorePath() { - return (String) virtualHost.getAttribute(VirtualHost.STORE_PATH); + return (String) messageStoreSettings.get(MessageStore.STORE_PATH); } + @SuppressWarnings("unchecked") @Override public Map<String, String> getParameters() { - return (Map<String, String>) virtualHost.getAttribute("bdbEnvironmentConfig"); + return (Map<String, String>) messageStoreSettings.get(EnvironmentFacadeFactory.ENVIRONMENT_CONFIGURATION); } + @SuppressWarnings("unchecked") @Override public Map<String, String> getReplicationParameters() { - return (Map<String, String>) virtualHost.getAttribute("haReplicationConfig"); + return (Map<String, String>) messageStoreSettings.get(REPLICATION_CONFIG); } @Override @@ -87,36 +97,35 @@ public class ReplicatedEnvironmentFacadeFactory implements EnvironmentFacadeFact return DEFAULT_NODE_PRIORITY; } - - @Override public String getName() { - return (String)virtualHost.getAttribute("haNodeName"); + return (String)messageStoreSettings.get(NODE_NAME); } @Override public String getHostPort() { - return (String)virtualHost.getAttribute("haNodeAddress"); + return (String)messageStoreSettings.get(NODE_ADDRESS); } @Override public String getHelperHostPort() { - return (String)virtualHost.getAttribute("haHelperAddress"); + return (String)messageStoreSettings.get(HELPER_ADDRESS); } @Override public String getGroupName() { - return (String)virtualHost.getAttribute("haGroupName"); + return (String)messageStoreSettings.get(GROUP_NAME); } @Override public String getDurability() { - return virtualHost.getAttribute("haDurability") == null ? DEFAULT_DURABILITY.toString() : (String)virtualHost.getAttribute("haDurability"); + String durability = (String)messageStoreSettings.get(DURABILITY); + return durability == null ? DEFAULT_DURABILITY.toString() : durability; } }; return new ReplicatedEnvironmentFacade(configuration); diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreQuotaEventsTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreQuotaEventsTest.java index 4684358190..65830fd1c2 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreQuotaEventsTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreQuotaEventsTest.java @@ -20,17 +20,18 @@ */ package org.apache.qpid.server.store.berkeleydb; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + import java.util.Collections; +import java.util.HashMap; import java.util.Map; + import org.apache.log4j.Logger; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.MessageStoreConstants; import org.apache.qpid.server.store.MessageStoreQuotaEventsTestBase; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.when; - public class BDBMessageStoreQuotaEventsTest extends MessageStoreQuotaEventsTestBase { private static final Logger _logger = Logger.getLogger(BDBMessageStoreQuotaEventsTest.class); @@ -59,16 +60,22 @@ public class BDBMessageStoreQuotaEventsTest extends MessageStoreQuotaEventsTestB return NUMBER_OF_MESSAGES_TO_OVERFILL_STORE; } + @Override - protected void applyStoreSpecificConfiguration(VirtualHost virtualHost) + protected VirtualHost<?> createVirtualHost(String storeLocation) { - _logger.debug("Applying store specific config. overfull-sze=" + OVERFULL_SIZE + ", underfull-size=" + UNDERFULL_SIZE); + _logger.debug("Applying store specific config. overfull-size=" + OVERFULL_SIZE + ", underfull-size=" + UNDERFULL_SIZE); + VirtualHost<?> vhost = mock(VirtualHost.class); + Map<String, Object> messageStoreSettings = new HashMap<String, Object>(); + messageStoreSettings.put(MessageStore.STORE_PATH, storeLocation); + messageStoreSettings.put(MessageStore.OVERFULL_SIZE, OVERFULL_SIZE); + messageStoreSettings.put(MessageStore.UNDERFULL_SIZE, UNDERFULL_SIZE); Map<String,String> envMap = Collections.singletonMap("je.log.fileMax", MAX_BDB_LOG_SIZE); - when(virtualHost.getAttribute(eq("bdbEnvironmentConfig"))).thenReturn(envMap); - when(virtualHost.getAttribute(eq(MessageStoreConstants.OVERFULL_SIZE_ATTRIBUTE))).thenReturn(OVERFULL_SIZE); - when(virtualHost.getAttribute(eq(MessageStoreConstants.UNDERFULL_SIZE_ATTRIBUTE))).thenReturn(UNDERFULL_SIZE); - + messageStoreSettings.put(EnvironmentFacadeFactory.ENVIRONMENT_CONFIGURATION, envMap); + when(vhost.getMessageStoreSettings()).thenReturn(messageStoreSettings); + when(vhost.getName()).thenReturn("test"); + return vhost; } @Override diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/VirtualHostTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/VirtualHostTest.java index da34e191f7..2caf85966c 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/VirtualHostTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/VirtualHostTest.java @@ -38,7 +38,9 @@ import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.State; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.stats.StatisticsGatherer; +import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade; +import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacadeFactory; import org.apache.qpid.server.util.BrokerTestHelper; import org.apache.qpid.test.utils.QpidTestCase; import org.apache.qpid.util.FileUtils; @@ -101,33 +103,31 @@ public class VirtualHostTest extends QpidTestCase String nodeHostPort = "localhost:" + findFreePort(); String helperHostPort = nodeHostPort; String durability = "NO_SYNC,SYNC,NONE"; - String hostName = getName(); + String virtualHostName = getName(); - Map<String, Object> virtualHostAttributes = new HashMap<String, Object>(); - virtualHostAttributes.put("haNodeName", nodeName); - virtualHostAttributes.put("haGroupName", groupName); - virtualHostAttributes.put("haNodeAddress", nodeHostPort); - virtualHostAttributes.put("haHelperAddress", helperHostPort); - virtualHostAttributes.put("haDurability", durability); - virtualHostAttributes.put(VirtualHost.STORE_PATH, _bdbStorePath.getAbsolutePath()); - virtualHostAttributes.put("haReplicationConfig", + Map<String, Object> messageStoreSettings = new HashMap<String, Object>(); + messageStoreSettings.put(ReplicatedEnvironmentFacadeFactory.NODE_NAME, nodeName); + messageStoreSettings.put(ReplicatedEnvironmentFacadeFactory.GROUP_NAME, groupName); + messageStoreSettings.put(ReplicatedEnvironmentFacadeFactory.NODE_ADDRESS, nodeHostPort); + messageStoreSettings.put(ReplicatedEnvironmentFacadeFactory.HELPER_ADDRESS, helperHostPort); + messageStoreSettings.put(ReplicatedEnvironmentFacadeFactory.DURABILITY, durability); + + messageStoreSettings.put(MessageStore.STORE_PATH, _bdbStorePath.getAbsolutePath()); + messageStoreSettings.put(ReplicatedEnvironmentFacadeFactory.REPLICATION_CONFIG, Collections.singletonMap(ReplicationConfig.REP_STREAM_TIMEOUT, repStreamTimeout)); - virtualHostAttributes.put(VirtualHost.NAME, hostName); + + Map<String, Object> virtualHostAttributes = new HashMap<String, Object>(); + virtualHostAttributes.put(VirtualHost.NAME, virtualHostName); virtualHostAttributes.put(VirtualHost.TYPE, BDBHAVirtualHostFactory.TYPE); + virtualHostAttributes.put(VirtualHost.MESSAGE_STORE_SETTINGS, messageStoreSettings); _host = createHost(virtualHostAttributes); _host.setDesiredState(State.INITIALISING, State.ACTIVE); - assertEquals("Unexpected host name", hostName, _host.getName()); + assertEquals("Unexpected virtual host name", virtualHostName, _host.getName()); assertEquals("Unexpected host type", BDBHAVirtualHostFactory.TYPE, _host.getType()); - assertEquals("Unexpected store type", ReplicatedEnvironmentFacade.TYPE, _host.getAttribute(VirtualHost.STORE_TYPE)); - - assertEquals(nodeName, _host.getAttribute("haNodeName")); - assertEquals(groupName, _host.getAttribute("haGroupName")); - assertEquals(nodeHostPort, _host.getAttribute("haNodeAddress")); - assertEquals(helperHostPort, _host.getAttribute("haHelperAddress")); - assertEquals(durability, _host.getAttribute("haDurability")); - assertEquals("Unexpected store path", _bdbStorePath.getAbsolutePath(), _host.getAttribute(VirtualHost.STORE_PATH)); + + assertEquals(messageStoreSettings, _host.getMessageStoreSettings()); BDBMessageStore messageStore = (BDBMessageStore) _host.getMessageStore(); ReplicatedEnvironment environment = (ReplicatedEnvironment) messageStore.getEnvironmentFacade().getEnvironment(); diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBBackupTest.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBBackupTest.java index b6a178ac8a..67c89718f6 100644 --- a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBBackupTest.java +++ b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBBackupTest.java @@ -32,6 +32,7 @@ import javax.jms.Session; import org.apache.log4j.Logger; import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.test.utils.Piper; import org.apache.qpid.test.utils.QpidBrokerTestCase; import org.apache.qpid.util.FileUtils; @@ -61,7 +62,8 @@ public class BDBBackupTest extends QpidBrokerTestCase _backupToDir = new File(SYSTEM_TMP_DIR + File.separator + getTestName()); _backupToDir.mkdirs(); Map<String, Object> virtualHostAttributes = getBrokerConfiguration().getObjectAttributes(TEST_VHOST); - _backupFromDir = new File((String)virtualHostAttributes.get(VirtualHost.STORE_PATH)); + Map<String, Object> messageStoreSettings = (Map<String, Object>) virtualHostAttributes.get(VirtualHost.MESSAGE_STORE_SETTINGS); + _backupFromDir = new File((String)messageStoreSettings.get(MessageStore.STORE_PATH)); boolean fromDirExistsAndIsDir = _backupFromDir.isDirectory(); assertTrue("backupFromDir " + _backupFromDir + " should already exist", fromDirExistsAndIsDir); } diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java index 3d6a2bac67..cb56a60119 100644 --- a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java +++ b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java @@ -45,6 +45,7 @@ import javax.management.openmbean.TabularDataSupport; import org.apache.qpid.management.common.mbeans.ManagedExchange; import org.apache.qpid.management.common.mbeans.ManagedQueue; import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.test.utils.JMXTestUtils; import org.apache.qpid.test.utils.QpidBrokerTestCase; import org.apache.qpid.test.utils.TestBrokerConfiguration; @@ -75,7 +76,7 @@ public class BDBUpgradeTest extends QpidBrokerTestCase private static final String QUEUE_NAME="myUpgradeQueue"; private static final String NON_DURABLE_QUEUE_NAME="queue-non-durable"; private static final String PRIORITY_QUEUE_NAME="myPriorityQueue"; - private static final String QUEUE_WITH_DLQ_NAME="myQueueWithDLQ"; + private static final String QUEUE_WITH_DLQ_NAME="myQueueWithDLQ"; private String _storeLocation; @@ -84,7 +85,9 @@ public class BDBUpgradeTest extends QpidBrokerTestCase { assertNotNull("QPID_WORK must be set", QPID_WORK_ORIG); Map<String, Object> virtualHostAttributes = getBrokerConfiguration().getObjectAttributes(TestBrokerConfiguration.ENTRY_NAME_VIRTUAL_HOST); - _storeLocation = (String)virtualHostAttributes.get(VirtualHost.STORE_PATH); + @SuppressWarnings("unchecked") + Map<String, Object> messageStoreSettings = (Map<String, Object>) virtualHostAttributes.get(VirtualHost.MESSAGE_STORE_SETTINGS); + _storeLocation = (String)messageStoreSettings.get(MessageStore.STORE_PATH); //Clear the two target directories if they exist. File directory = new File(_storeLocation); @@ -102,11 +105,6 @@ public class BDBUpgradeTest extends QpidBrokerTestCase super.setUp(); } - private String getWorkDirBaseDir() - { - return QPID_WORK_ORIG + (isInternalBroker() ? "" : "/" + getPort()); - } - /** * Test that the selector applied to the DurableSubscription was successfully * transfered to the new store, and functions as expected with continued use @@ -505,7 +503,7 @@ public class BDBUpgradeTest extends QpidBrokerTestCase return send; } - + /** * Generates a string of a given length consisting of the sequence 0,1,2,..,9,0,1,2. * diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java index bef35e163f..e8d18971ad 100644 --- a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java +++ b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java @@ -184,7 +184,7 @@ public class HAClusterManagementTest extends QpidBrokerTestCase final int oldBdbPort = _clusterCreator.getBdbPortForBrokerPort(brokerPortNumberToBeMoved); final int newBdbPort = getNextAvailable(oldBdbPort + 1); - storeBean.updateAddress(_clusterCreator.getNodeNameForNodeAt(oldBdbPort), "localhost", newBdbPort); + storeBean.updateAddress(_clusterCreator.getNodeNameForNodeAt(oldBdbPort), _clusterCreator.getIpAddressOfBrokerHost(), newBdbPort); _clusterCreator.modifyClusterNodeBdbAddress(brokerPortNumberToBeMoved, newBdbPort); diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java index 1a65b095b4..4efe1967ce 100644 --- a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java +++ b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java @@ -43,8 +43,10 @@ import org.apache.log4j.Logger; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQConnectionURL; import org.apache.qpid.server.model.VirtualHost; -import org.apache.qpid.test.utils.TestBrokerConfiguration; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacadeFactory; import org.apache.qpid.test.utils.QpidBrokerTestCase; +import org.apache.qpid.test.utils.TestBrokerConfiguration; import org.apache.qpid.url.URLSyntaxException; import com.sleepycat.je.rep.ReplicationConfig; @@ -101,19 +103,22 @@ public class HATestClusterCreator _bdbHelperPort = bdbPort; } - TestBrokerConfiguration brokerConfiguration = _testcase.getBrokerConfiguration(brokerPort); - brokerConfiguration.addJmxManagementConfiguration(); String nodeName = getNodeNameForNodeAt(bdbPort); - brokerConfiguration.setObjectAttribute(_virtualHostName, VirtualHost.TYPE, BDBHAVirtualHostFactory.TYPE); - brokerConfiguration.setObjectAttribute(_virtualHostName, VirtualHost.STORE_PATH, System.getProperty("QPID_WORK") + File.separator + brokerPort); - brokerConfiguration.setObjectAttribute(_virtualHostName, "haGroupName", _groupName); - brokerConfiguration.setObjectAttribute(_virtualHostName, "haNodeName", nodeName); - brokerConfiguration.setObjectAttribute(_virtualHostName, "haNodeAddress", getNodeHostPortForNodeAt(bdbPort)); - brokerConfiguration.setObjectAttribute(_virtualHostName, "haHelperAddress", getHelperHostPort()); + Map<String, Object> messageStoreSettings = new HashMap<String, Object>(); + messageStoreSettings.put(MessageStore.STORE_PATH, System.getProperty("QPID_WORK") + File.separator + brokerPort); + messageStoreSettings.put(ReplicatedEnvironmentFacadeFactory.GROUP_NAME, _groupName); + messageStoreSettings.put(ReplicatedEnvironmentFacadeFactory.NODE_NAME, nodeName); + messageStoreSettings.put(ReplicatedEnvironmentFacadeFactory.NODE_ADDRESS, getNodeHostPortForNodeAt(bdbPort)); + messageStoreSettings.put(ReplicatedEnvironmentFacadeFactory.HELPER_ADDRESS, getHelperHostPort()); Map<String, String> repSettings = new HashMap<String, String>(); repSettings.put(ReplicationConfig.INSUFFICIENT_REPLICAS_TIMEOUT, "2 s"); repSettings.put(ReplicationConfig.ELECTIONS_PRIMARY_RETRIES, "0"); - brokerConfiguration.setObjectAttribute(_virtualHostName, "haReplicationConfig", repSettings ); + messageStoreSettings.put(ReplicatedEnvironmentFacadeFactory.REPLICATION_CONFIG, repSettings ); + + TestBrokerConfiguration brokerConfiguration = _testcase.getBrokerConfiguration(brokerPort); + brokerConfiguration.addJmxManagementConfiguration(); + brokerConfiguration.setObjectAttribute(_virtualHostName, VirtualHost.TYPE, BDBHAVirtualHostFactory.TYPE); + brokerConfiguration.setObjectAttribute(_virtualHostName, VirtualHost.MESSAGE_STORE_SETTINGS, messageStoreSettings); brokerPort = _testcase.getNextAvailable(bdbPort + 1); } @@ -127,7 +132,10 @@ public class HATestClusterCreator throw new IllegalArgumentException("Only two nodes groups have the concept of primary"); } TestBrokerConfiguration config = _testcase.getBrokerConfiguration(_primaryBrokerPort); - config.setObjectAttribute("test", "haDesignatedPrimary", designatedPrimary); + @SuppressWarnings("unchecked") + Map<String, Object> storeSetting = (Map<String, Object>) config.getObjectAttributes(_virtualHostName).get(VirtualHost.MESSAGE_STORE_SETTINGS); + storeSetting.put(ReplicatedEnvironmentFacadeFactory.DESIGNATED_PRIMARY, designatedPrimary); + config.setObjectAttribute(_virtualHostName, VirtualHost.MESSAGE_STORE_SETTINGS, storeSetting); config.setSaved(false); } @@ -360,12 +368,15 @@ public class HATestClusterCreator public void modifyClusterNodeBdbAddress(int brokerPortNumberToBeMoved, int newBdbPort) { TestBrokerConfiguration config = _testcase.getBrokerConfiguration(brokerPortNumberToBeMoved); - config.setObjectAttribute(_virtualHostName, "haNodeAddress", "localhost:" + newBdbPort); - String oldBdbHostPort = (String)config.getObjectAttributes(_virtualHostName).get("haNodeAddress"); + + @SuppressWarnings("unchecked") + Map<String, Object> storeSetting = (Map<String, Object>) config.getObjectAttributes(_virtualHostName).get(VirtualHost.MESSAGE_STORE_SETTINGS); + String oldBdbHostPort = (String) storeSetting.get(ReplicatedEnvironmentFacadeFactory.NODE_ADDRESS); String[] oldHostAndPort = StringUtils.split(oldBdbHostPort, ":"); String oldHost = oldHostAndPort[0]; String newBdbHostPort = oldHost + ":" + newBdbPort; - config.setObjectAttribute(_virtualHostName, "haNodeAddress", newBdbHostPort); + storeSetting.put(ReplicatedEnvironmentFacadeFactory.NODE_ADDRESS, newBdbHostPort); + config.setObjectAttribute(_virtualHostName, VirtualHost.MESSAGE_STORE_SETTINGS, storeSetting); config.setSaved(false); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/startup/StoreUpgrader.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/startup/StoreUpgrader.java index 124584e99c..16da78c988 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/startup/StoreUpgrader.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/startup/StoreUpgrader.java @@ -19,10 +19,12 @@ package org.apache.qpid.server.configuration.startup;/* * */ +import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.Map; + import org.apache.qpid.server.configuration.ConfigurationEntry; import org.apache.qpid.server.configuration.ConfigurationEntryStore; import org.apache.qpid.server.model.Broker; @@ -134,6 +136,110 @@ public abstract class StoreUpgrader } }; + final static StoreUpgrader UPGRADE_1_3 = new StoreUpgrader("1.3") + { + private final String[] HA_ATTRIBUTES = {"haNodeName", "haGroupName", "haHelperAddress", "haCoalescingSync", "haNodeAddress","haDurability","haDesignatedPrimary","haReplicationConfig","bdbEnvironmentConfig"}; + private final String[] JDBC_ATTRIBUTES = {"connectionURL", "connectionPool", "jdbcBigIntType", "jdbcBytesForBlob", "jdbcVarbinaryType", "jdbcBlobType", "partitionCount", "maxConnectionsPerPartition", "minConnectionsPerPartition"}; + private final String[] STORE_TYPES = {"BDB", "BDB-HA", "JDBC", "Memory", "DERBY"}; + + @Override + protected void doUpgrade(ConfigurationEntryStore store) + { + ConfigurationEntry root = store.getRootEntry(); + Map<String, Collection<ConfigurationEntry>> children = root.getChildren(); + Collection<ConfigurationEntry> vhosts = children.get("VirtualHost"); + Collection<ConfigurationEntry> changed = new ArrayList<ConfigurationEntry>(); + for(ConfigurationEntry vhost : vhosts) + { + Map<String, Object> attributes = vhost.getAttributes(); + Map<String, Object> newAttributes = new HashMap<String, Object>(attributes); + Map<String, Object> messageStoreSettings = new HashMap<String, Object>(); + String storeType = (String) attributes.get("storeType"); + String realStoreType = storeType; + for (String type : STORE_TYPES) + { + if (type.equalsIgnoreCase(storeType)) + { + realStoreType = type; + break; + } + } + if(attributes.containsKey("storeType")) + { + newAttributes.remove("storeType"); + messageStoreSettings.put("storeType", realStoreType); + } + if (attributes.containsKey("storePath")) + { + messageStoreSettings.put("storePath", newAttributes.remove("storePath")); + } + if (attributes.containsKey("storeUnderfullSize")) + { + messageStoreSettings.put("storeUnderfullSize", newAttributes.remove("storeUnderfullSize")); + } + if (attributes.containsKey("storeOverfullSize")) + { + messageStoreSettings.put("storeOverfullSize", newAttributes.remove("storeOverfullSize")); + } + + if ("BDB_HA".equals(attributes.get("type"))) + { + for (String haAttribute : HA_ATTRIBUTES) + { + if(attributes.containsKey(haAttribute)) + { + messageStoreSettings.put(haAttribute, newAttributes.remove(haAttribute)); + } + } + messageStoreSettings.remove("storeType"); + } + else + { + + if ("JDBC".equalsIgnoreCase(realStoreType)) + { + boolean removeAttribute = !"JDBC".equals(attributes.get("configStoreType")); + for (String jdbcAttribute : JDBC_ATTRIBUTES) + { + if(attributes.containsKey(jdbcAttribute)) + { + Object value = null; + if (removeAttribute) + { + value = newAttributes.remove(jdbcAttribute); + } + else + { + value = newAttributes.get(jdbcAttribute); + } + messageStoreSettings.put(jdbcAttribute, value); + } + } + } + else if ("BDB".equals(realStoreType)) + { + if(attributes.containsKey("bdbEnvironmentConfig")) + { + messageStoreSettings.put("bdbEnvironmentConfig", newAttributes.remove("bdbEnvironmentConfig")); + } + } + } + + if (!messageStoreSettings.isEmpty()) + { + newAttributes.put("messageStoreSettings", messageStoreSettings); + changed.add(new ConfigurationEntry(vhost.getId(),vhost.getType(), newAttributes, vhost.getChildrenIds(), store)); + } + } + Map<String, Object> attributes = new HashMap<String, Object>(root.getAttributes()); + attributes.put(Broker.MODEL_VERSION, "1.4"); + changed.add(new ConfigurationEntry(root.getId(), root.getType(), attributes, root.getChildrenIds(),store)); + + store.save(changed.toArray(new ConfigurationEntry[changed.size()])); + + } + }; + private StoreUpgrader(String version) { _upgraders.put(version, this); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/MemoryConfigurationEntryStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/MemoryConfigurationEntryStore.java index 010d74eb7f..7b03946680 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/MemoryConfigurationEntryStore.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/MemoryConfigurationEntryStore.java @@ -60,6 +60,7 @@ import org.codehaus.jackson.node.ArrayNode; public class MemoryConfigurationEntryStore implements ConfigurationEntryStore { + public static final String STORE_TYPE = "memory"; private static final String DEFAULT_BROKER_NAME = "Broker"; @@ -545,6 +546,10 @@ public class MemoryConfigurationEntryStore implements ConfigurationEntryStore } else if (fieldNode.isObject()) { + if (attributes == null) + { + attributes = new HashMap<String, Object>(); + } attributes.put(fieldName, toObject(fieldNode) ); } else diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Model.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Model.java index f940b323be..c48c7bb7f6 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Model.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Model.java @@ -34,10 +34,11 @@ public class Model * * 1.0 Initial version * 1.1 Addition of mandatory virtual host type / different types of virtual host - * + * 1.3 Truststore/Keystore type => trustStoreType / type => keyStoreType + * 1.4 Separate messageStoreSettings from virtualhost */ public static final int MODEL_MAJOR_VERSION = 1; - public static final int MODEL_MINOR_VERSION = 3; + public static final int MODEL_MINOR_VERSION = 4; public static final String MODEL_VERSION = MODEL_MAJOR_VERSION + "." + MODEL_MINOR_VERSION; private static final Model MODEL_INSTANCE = new Model(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java index 827c01f70f..46aa8dcc8e 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java @@ -20,15 +20,15 @@ */ package org.apache.qpid.server.model; +import java.security.AccessControlException; +import java.util.Collection; +import java.util.Map; + import org.apache.qpid.server.configuration.updater.TaskExecutor; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.store.MessageStore; -import java.security.AccessControlException; -import java.util.Collection; -import java.util.Map; - @ManagedObject( managesChildren = true ) public interface VirtualHost<X extends VirtualHost<X>> extends ConfiguredObject<X> { @@ -48,17 +48,19 @@ public interface VirtualHost<X extends VirtualHost<X>> extends ConfiguredObject< String STORE_TRANSACTION_IDLE_TIMEOUT_WARN = "storeTransactionIdleTimeoutWarn"; String STORE_TRANSACTION_OPEN_TIMEOUT_CLOSE = "storeTransactionOpenTimeoutClose"; String STORE_TRANSACTION_OPEN_TIMEOUT_WARN = "storeTransactionOpenTimeoutWarn"; - String STORE_TYPE = "storeType"; - String STORE_PATH = "storePath"; - String CONFIG_STORE_TYPE = "configStoreType"; - String CONFIG_STORE_PATH = "configStorePath"; String SUPPORTED_EXCHANGE_TYPES = "supportedExchangeTypes"; String SUPPORTED_QUEUE_TYPES = "supportedQueueTypes"; String DURABLE = "durable"; String LIFETIME_POLICY = "lifetimePolicy"; String SECURITY_ACL = "securityAcl"; String HOUSE_KEEPING_THREAD_COUNT = "houseKeepingThreadCount"; + String CONFIGURATION_STORE_SETTINGS = "configurationStoreSettings"; + String MESSAGE_STORE_SETTINGS = "messageStoreSettings"; + @Deprecated + String CONFIG_STORE_TYPE = "configStoreType"; + @Deprecated + String CONFIG_STORE_PATH = "configStorePath"; // Attributes int CURRENT_CONFIG_VERSION = 4; @@ -85,18 +87,14 @@ public interface VirtualHost<X extends VirtualHost<X>> extends ConfiguredObject< long getQueue_flowResumeSizeBytes(); @ManagedAttribute + @Deprecated String getConfigStoreType(); @ManagedAttribute + @Deprecated String getConfigStorePath(); @ManagedAttribute - String getStoreType(); - - @ManagedAttribute - String getStorePath(); - - @ManagedAttribute long getStoreTransactionIdleTimeoutClose(); @ManagedAttribute @@ -129,6 +127,12 @@ public interface VirtualHost<X extends VirtualHost<X>> extends ConfiguredObject< @ManagedAttribute int getHouseKeepingThreadCount(); + @ManagedAttribute + Map<String, Object> getMessageStoreSettings(); + + @ManagedAttribute + Map<String, Object> getConfigurationStoreSettings(); + @ManagedStatistic long getQueueCount(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java index 82051c3a41..0060703792 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java @@ -49,6 +49,7 @@ import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.txn.LocalTransaction; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.MapValueConverter; +import org.apache.qpid.server.util.ParameterizedTypeImpl; import org.apache.qpid.server.plugin.VirtualHostFactory; import org.apache.qpid.server.util.ServerScopedRuntimeException; import org.apache.qpid.server.virtualhost.ExchangeExistsException; @@ -66,8 +67,6 @@ public final class VirtualHostAdapter extends AbstractConfiguredObject<VirtualHo public static final Map<String, Type> ATTRIBUTE_TYPES = Collections.unmodifiableMap(new HashMap<String, Type>(){{ put(NAME, String.class); put(TYPE, String.class); - put(STORE_PATH, String.class); - put(STORE_TYPE, String.class); put(STATE, State.class); put(QUEUE_ALERT_REPEAT_GAP, Long.class); @@ -86,6 +85,9 @@ public final class VirtualHostAdapter extends AbstractConfiguredObject<VirtualHo put(STORE_TRANSACTION_OPEN_TIMEOUT_CLOSE, Long.class); put(STORE_TRANSACTION_OPEN_TIMEOUT_WARN, Long.class); + put(MESSAGE_STORE_SETTINGS, new ParameterizedTypeImpl(Map.class, String.class, Object.class)); + put(CONFIGURATION_STORE_SETTINGS, new ParameterizedTypeImpl(Map.class, String.class, Object.class)); + put(CONFIG_STORE_TYPE, String.class); put(CONFIG_STORE_PATH, String.class); @@ -126,20 +128,14 @@ public final class VirtualHostAdapter extends AbstractConfiguredObject<VirtualHo { throw new IllegalConfigurationException("Virtual host type must be specified"); } - } - - private void validateAttributes(String type) - { final VirtualHostFactory factory = VirtualHostFactory.FACTORIES.get(type); if(factory == null) { throw new IllegalArgumentException("Unknown virtual host type '"+ type +"'. Valid types are: " + VirtualHostFactory.TYPES.get()); } factory.validateAttributes(getActualAttributes()); - } - public Collection<VirtualHostAlias> getAliases() { return Collections.unmodifiableCollection(_aliases); @@ -737,14 +733,6 @@ public final class VirtualHostAdapter extends AbstractConfiguredObject<VirtualHo { // TODO } - else if(STORE_TYPE.equals(name)) - { - return _virtualHost.getMessageStore().getStoreType(); - } - else if(STORE_PATH.equals(name)) - { - return _virtualHost.getMessageStore().getStoreLocation(); - } } return super.getAttribute(name); } @@ -832,18 +820,6 @@ public final class VirtualHostAdapter extends AbstractConfiguredObject<VirtualHo } @Override - public String getStoreType() - { - return _virtualHost.getMessageStore().getStoreType(); - } - - @Override - public String getStorePath() - { - return _virtualHost.getMessageStore().getStoreLocation(); - } - - @Override public long getStoreTransactionIdleTimeoutClose() { return (Long)getAttribute(VirtualHost.STORE_TRANSACTION_IDLE_TIMEOUT_CLOSE); @@ -897,6 +873,20 @@ public final class VirtualHostAdapter extends AbstractConfiguredObject<VirtualHo return (Long)getAttribute(VirtualHost.QUEUE_ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES); } + @SuppressWarnings("unchecked") + @Override + public Map<String, Object> getMessageStoreSettings() + { + return (Map<String, Object>)getAttribute(VirtualHost.MESSAGE_STORE_SETTINGS); + } + + @SuppressWarnings("unchecked") + @Override + public Map<String, Object> getConfigurationStoreSettings() + { + return (Map<String, Object>)getAttribute(VirtualHost.CONFIGURATION_STORE_SETTINGS); + } + @Override public long getQueueCount() { @@ -1118,4 +1108,5 @@ public final class VirtualHostAdapter extends AbstractConfiguredObject<VirtualHo { return super.getTaskExecutor(); } + } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/JDBCConnectionProviderFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/JDBCConnectionProviderFactory.java index 546f2fa05d..eb981bdd02 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/JDBCConnectionProviderFactory.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/JDBCConnectionProviderFactory.java @@ -25,6 +25,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; + import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.store.jdbc.ConnectionProvider; @@ -32,7 +33,7 @@ public interface JDBCConnectionProviderFactory extends Pluggable { String getType(); - ConnectionProvider getConnectionProvider(String connectionUrl, VirtualHost virtualHost) + ConnectionProvider getConnectionProvider(String connectionUrl, VirtualHost virtualHost, boolean configStoreOnly) throws SQLException; static final class TYPES diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java index 626b999d72..b3a6216c84 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java @@ -28,6 +28,11 @@ import org.apache.qpid.server.model.VirtualHost; */ public interface MessageStore { + String STORE_TYPE = "storeType"; + String STORE_PATH = "storePath"; + String UNDERFULL_SIZE = "storeUnderfullSize"; + String OVERFULL_SIZE = "storeOverfullSize"; + /** * Called after instantiation in order to configure the message store. A particular implementation can define * whatever parameters it wants. diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStoreConstants.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStoreConstants.java deleted file mode 100644 index 93b669e6e4..0000000000 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStoreConstants.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store; - -public class MessageStoreConstants -{ - - public static final String ENVIRONMENT_PATH_PROPERTY = "environment-path"; - public static final String OVERFULL_SIZE_PROPERTY = "overfull-size"; - public static final String UNDERFULL_SIZE_PROPERTY = "underfull-size"; - public static final String OVERFULL_SIZE_ATTRIBUTE = "storeOverfullSize"; - public static final String UNDERFULL_SIZE_ATTRIBUTE = "storeUnderfullSize"; - -} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/util/MapValueConverter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/util/MapValueConverter.java index 3543ce3bcf..b759bd5dc4 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/util/MapValueConverter.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/util/MapValueConverter.java @@ -321,21 +321,7 @@ public class MapValueConverter else if (typeObject instanceof ParameterizedType) { ParameterizedType parameterizedType= (ParameterizedType)typeObject; - Type type = parameterizedType.getRawType(); - if (type == Set.class) - { - Type[] actualTypeArguments = parameterizedType.getActualTypeArguments(); - if (actualTypeArguments.length != 1) - { - throw new IllegalArgumentException("Set type argument is not specified"); - } - Class<?> classObject = (Class<?>)actualTypeArguments[0]; - value = toSet(rawValue, classObject, attributeName); - } - else - { - throw new IllegalArgumentException("Conversion into " + parameterizedType + " is not yet supported"); - } + value = convertParameterizedType(rawValue, parameterizedType, attributeName); } else { @@ -352,6 +338,62 @@ public class MapValueConverter return attributes; } + private static Object convertParameterizedType(Object rawValue, ParameterizedType parameterizedType, String attributeName) + { + Type type = parameterizedType.getRawType(); + Type[] actualTypeArguments = parameterizedType.getActualTypeArguments(); + Object convertedValue; + if (type == Set.class) + { + if (actualTypeArguments.length != 1) + { + throw new IllegalArgumentException("Unexpected number of Set type arguments " + actualTypeArguments.length); + } + Class<?> classObject = (Class<?>)actualTypeArguments[0]; + convertedValue = toSet(rawValue, classObject, attributeName); + } + else if (type == Map.class) + { + if (actualTypeArguments.length != 2) + { + throw new IllegalArgumentException("Unexpected number of Map type arguments " + actualTypeArguments.length); + } + Class<?> keyClassObject = (Class<?>)actualTypeArguments[0]; + Class<?> valueClassObject = (Class<?>)actualTypeArguments[1]; + convertedValue = toMap(rawValue, keyClassObject, valueClassObject, attributeName); + } + else + { + throw new IllegalArgumentException("Conversion into " + parameterizedType + " is not yet supported"); + } + return convertedValue; + } + + private static <K,V> Map<K, V> toMap(Object rawValue, Class<K> keyClassObject, Class<V> valueClassObject, String attributeName) + { + if (rawValue == null) + { + return null; + } + if (rawValue instanceof Map) + { + Map<K, V> convertedMap = new HashMap<K, V>(); + Map<?, ?> rawMap = (Map<?,?>)rawValue; + + for (Map.Entry<?, ?> entry : rawMap.entrySet()) + { + K convertedKey = convert(entry.getKey(), keyClassObject, attributeName + " (map key)"); + V convertedValue = convert(entry.getValue(), valueClassObject, attributeName + " (map value)"); + convertedMap.put(convertedKey, convertedValue); + } + return convertedMap; + } + else + { + throw new IllegalArgumentException("rawValue is not of unexpected type Map, was : " + rawValue.getClass()); + } + } + public static <T> Set<T> toSet(Object rawValue, Class<T> setItemClass, String attributeName) { if (rawValue == null) @@ -361,7 +403,7 @@ public class MapValueConverter HashSet<T> set = new HashSet<T>(); if (rawValue instanceof Iterable) { - Iterable<?> iterable = (Iterable<?>)rawValue; + Iterable<?> iterable = (Iterable<?>)rawValue; for (Object object : iterable) { T converted = convert(object, setItemClass, attributeName); @@ -409,6 +451,10 @@ public class MapValueConverter { value = toEnum(attributeName, rawValue, (Class<Enum>) classObject); } + else if (classObject == Object.class) + { + value = rawValue; + } else { throw new IllegalArgumentException("Cannot convert '" + rawValue + "' of type '" + rawValue.getClass() diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java index da2ef47670..47c50115d3 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java @@ -19,6 +19,8 @@ package org.apache.qpid.server.virtualhost;/* * */ +import java.util.Map; + import org.apache.qpid.server.configuration.IllegalConfigurationException; import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject; import org.apache.qpid.server.model.VirtualHost; @@ -46,10 +48,9 @@ public class StandardVirtualHost extends AbstractVirtualHost - private MessageStore initialiseMessageStore(VirtualHost virtualHost) + private MessageStore initialiseMessageStore(String storeType) { - final String storeTypeAttr = (String) virtualHost.getAttribute(VirtualHost.STORE_TYPE); - MessageStore messageStore = MessageStoreFactory.FACTORY_LOADER.get(storeTypeAttr).createMessageStore(); + MessageStore messageStore = MessageStoreFactory.FACTORY_LOADER.get(storeType).createMessageStore(); MessageStoreLogSubject storeLogSubject = new MessageStoreLogSubject(getName(), messageStore.getClass().getSimpleName()); @@ -83,7 +84,9 @@ public class StandardVirtualHost extends AbstractVirtualHost protected void initialiseStorage(VirtualHost virtualHost) { - _messageStore = initialiseMessageStore(virtualHost); + Map<String, Object> messageStoreSettings = virtualHost.getMessageStoreSettings(); + String storeType = (String) messageStoreSettings.get(MessageStore.STORE_TYPE); + _messageStore = initialiseMessageStore(storeType); _durableConfigurationStore = initialiseConfigurationStore(virtualHost); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHostFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHostFactory.java index 9cab87e3b4..7cc8eaa20c 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHostFactory.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHostFactory.java @@ -25,6 +25,7 @@ import java.util.Map; import org.apache.qpid.server.plugin.MessageStoreFactory; import org.apache.qpid.server.plugin.VirtualHostFactory; import org.apache.qpid.server.stats.StatisticsGatherer; +import org.apache.qpid.server.store.MessageStore; public class StandardVirtualHostFactory implements VirtualHostFactory { @@ -50,19 +51,26 @@ public class StandardVirtualHostFactory implements VirtualHostFactory @Override public void validateAttributes(Map<String, Object> attributes) { + @SuppressWarnings("unchecked") + Map<String, Object> messageStoreSettings = (Map<String, Object>)attributes.get(org.apache.qpid.server.model.VirtualHost.MESSAGE_STORE_SETTINGS); + if (messageStoreSettings == null) + { + throw new IllegalArgumentException("Attribute '"+ org.apache.qpid.server.model.VirtualHost.MESSAGE_STORE_SETTINGS + "' is required."); + } + + Object storeType = messageStoreSettings.get(MessageStore.STORE_TYPE); // need store type and path Collection<String> knownTypes = MessageStoreFactory.FACTORY_LOADER.getSupportedTypes(); - Object storeType = attributes.get(org.apache.qpid.server.model.VirtualHost.STORE_TYPE); if (storeType == null) { - throw new IllegalArgumentException("Attribute '"+ org.apache.qpid.server.model.VirtualHost.STORE_TYPE - +"' is required. Known types are : " + knownTypes); + throw new IllegalArgumentException("Setting '"+ MessageStore.STORE_TYPE + +"' is required in attribute " + org.apache.qpid.server.model.VirtualHost.MESSAGE_STORE_SETTINGS + ". Known types are : " + knownTypes); } else if (!(storeType instanceof String)) { - throw new IllegalArgumentException("Attribute '"+ org.apache.qpid.server.model.VirtualHost.STORE_TYPE + throw new IllegalArgumentException("Setting '"+ MessageStore.STORE_TYPE +"' is required and must be of type String. " +"Known types are : " + knownTypes); } @@ -70,7 +78,7 @@ public class StandardVirtualHostFactory implements VirtualHostFactory MessageStoreFactory factory = MessageStoreFactory.FACTORY_LOADER.get((String)storeType); if(factory == null) { - throw new IllegalArgumentException("Attribute '"+ org.apache.qpid.server.model.VirtualHost.STORE_TYPE + throw new IllegalArgumentException("Setting '"+ MessageStore.STORE_TYPE +"' has value '" + storeType + "' which is not one of the valid values: " + "Known types are : " + knownTypes); } diff --git a/qpid/java/broker-core/src/main/resources/initial-config.json b/qpid/java/broker-core/src/main/resources/initial-config.json index 7173433aa8..efc98f6ac8 100644 --- a/qpid/java/broker-core/src/main/resources/initial-config.json +++ b/qpid/java/broker-core/src/main/resources/initial-config.json @@ -21,7 +21,7 @@ { "name": "Broker", "storeVersion": 1, - "modelVersion": "1.2", + "modelVersion": "1.4", "defaultVirtualHost" : "default", "authenticationproviders" : [ { "name" : "passwordFile", @@ -55,8 +55,10 @@ "virtualhosts" : [ { "name" : "default", "type" : "STANDARD", - "storeType" : "DERBY", - "storePath" : "${qpid.work_dir}/derbystore/default" + "messageStoreSettings" : { + "storeType" : "DERBY", + "storePath" : "${qpid.work_dir}/derbystore/default" + } } ], "plugins" : [ { "pluginType" : "MANAGEMENT-HTTP", diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/StoreUpgraderTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/StoreUpgraderTest.java new file mode 100644 index 0000000000..1f435b502f --- /dev/null +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/StoreUpgraderTest.java @@ -0,0 +1,217 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.configuration.startup; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; + +import junit.framework.TestCase; + +import org.apache.qpid.server.configuration.ConfigurationEntry; +import org.apache.qpid.server.configuration.ConfigurationEntryStore; +import org.apache.qpid.server.model.Broker; +import org.apache.qpid.server.model.VirtualHost; + +public class StoreUpgraderTest extends TestCase +{ + + private final UUID _brokerId = UUID.randomUUID(); + private final UUID _virtualHostId = UUID.randomUUID(); + private ConfigurationEntryStore _store = mock(ConfigurationEntryStore.class); + + public void testUpgrade13To14_Derby() throws Exception + { + HashMap<String, Object> virtualHostAttributes = new HashMap<String, Object>(); + virtualHostAttributes.put("name", "test"); + virtualHostAttributes.put("type", "STANDARD"); + virtualHostAttributes.put("storeType", "DERBy"); + virtualHostAttributes.put("storePath", "/mystorepath"); + virtualHostAttributes.put("storeUnderfullSize", 1000); + virtualHostAttributes.put("storeOverfullSize", 2000); + + doTest(_store, virtualHostAttributes); + + ConfigurationEntry expectNewRoot = new ConfigurationEntry(_brokerId, Broker.class.getSimpleName(), Collections.<String, Object>singletonMap(Broker.MODEL_VERSION, "1.4"), Collections.singleton(_virtualHostId), _store); + ConfigurationEntry expectedNewVirtualHost; + { + Map<String, Object> expectedNewVirtualHostMessageSettings = new HashMap<String, Object>(); + expectedNewVirtualHostMessageSettings.put("storeType", "DERBY"); + expectedNewVirtualHostMessageSettings.put("storePath", "/mystorepath"); + expectedNewVirtualHostMessageSettings.put("storeUnderfullSize", 1000); + expectedNewVirtualHostMessageSettings.put("storeOverfullSize", 2000); + + Map<String, Object> expectedNewVirtualHostAttributes = new HashMap<String, Object>(); + expectedNewVirtualHostAttributes.put(VirtualHost.NAME, "test"); + expectedNewVirtualHostAttributes.put(VirtualHost.TYPE, "STANDARD"); + expectedNewVirtualHostAttributes.put(VirtualHost.MESSAGE_STORE_SETTINGS, expectedNewVirtualHostMessageSettings); + + expectedNewVirtualHost = new ConfigurationEntry(_virtualHostId, VirtualHost.class.getSimpleName(), expectedNewVirtualHostAttributes, Collections.<UUID>emptySet(), _store); + } + verify(_store).save(expectedNewVirtualHost, expectNewRoot); + } + + public void testUpgrade13To14_BdbHa() throws Exception + { + HashMap<String, Object> virtualHostAttributes = new HashMap<String, Object>(); + virtualHostAttributes.put("name", "test"); + virtualHostAttributes.put("type", "BDB_HA"); + virtualHostAttributes.put("storeType", "BdB-HA"); + virtualHostAttributes.put("storePath", "/mystorepath"); + virtualHostAttributes.put("storeUnderfullSize", 1000); + virtualHostAttributes.put("storeOverfullSize", 2000); + virtualHostAttributes.put("haNodeName", "node1"); + virtualHostAttributes.put("haGroupName", "group1"); + virtualHostAttributes.put("haHelperAddress", "helper:1000"); + virtualHostAttributes.put("haCoalescingSync", true); + virtualHostAttributes.put("haNodeAddress", "nodeaddr:1000"); + virtualHostAttributes.put("haDurability", "sync,sync,all"); + virtualHostAttributes.put("haDesignatedPrimary", true); + virtualHostAttributes.put("haReplicationConfig", Collections.singletonMap("hasettings", "havalue")); + virtualHostAttributes.put("bdbEnvironmentConfig", Collections.singletonMap("envsettings", "envvalue")); + + doTest(_store, virtualHostAttributes); + + ConfigurationEntry expectNewRoot = new ConfigurationEntry(_brokerId, Broker.class.getSimpleName(), Collections.<String, Object>singletonMap(Broker.MODEL_VERSION, "1.4"), Collections.singleton(_virtualHostId), _store); + ConfigurationEntry expectedNewVirtualHost; + { + Map<String, Object> expectedNewVirtualHostMessageSettings = new HashMap<String, Object>(); + expectedNewVirtualHostMessageSettings.put("storePath", "/mystorepath"); + expectedNewVirtualHostMessageSettings.put("storeUnderfullSize", 1000); + expectedNewVirtualHostMessageSettings.put("storeOverfullSize", 2000); + expectedNewVirtualHostMessageSettings.put("haNodeName", "node1"); + expectedNewVirtualHostMessageSettings.put("haGroupName", "group1"); + expectedNewVirtualHostMessageSettings.put("haHelperAddress", "helper:1000"); + expectedNewVirtualHostMessageSettings.put("haCoalescingSync", true); + expectedNewVirtualHostMessageSettings.put("haNodeAddress", "nodeaddr:1000"); + expectedNewVirtualHostMessageSettings.put("haDurability", "sync,sync,all"); + expectedNewVirtualHostMessageSettings.put("haDesignatedPrimary", true); + expectedNewVirtualHostMessageSettings.put("haReplicationConfig", Collections.singletonMap("hasettings", "havalue")); + expectedNewVirtualHostMessageSettings.put("bdbEnvironmentConfig", Collections.singletonMap("envsettings", "envvalue")); + + Map<String, Object> expectedNewVirtualHostAttributes = new HashMap<String, Object>(); + expectedNewVirtualHostAttributes.put(VirtualHost.NAME, "test"); + expectedNewVirtualHostAttributes.put(VirtualHost.TYPE, "BDB_HA"); + expectedNewVirtualHostAttributes.put(VirtualHost.MESSAGE_STORE_SETTINGS, expectedNewVirtualHostMessageSettings); + + expectedNewVirtualHost = new ConfigurationEntry(_virtualHostId, VirtualHost.class.getSimpleName(), expectedNewVirtualHostAttributes, Collections.<UUID>emptySet(), _store); + } + verify(_store).save(expectedNewVirtualHost, expectNewRoot); + } + + public void testUpgrade13To14_Bdb() throws Exception + { + HashMap<String, Object> virtualHostAttributes = new HashMap<String, Object>(); + virtualHostAttributes.put("name", "test"); + virtualHostAttributes.put("type", "STANDARD"); + virtualHostAttributes.put("storeType", "BdB"); + virtualHostAttributes.put("storePath", "/mystorepath"); + virtualHostAttributes.put("storeUnderfullSize", 1000); + virtualHostAttributes.put("storeOverfullSize", 2000); + virtualHostAttributes.put("bdbEnvironmentConfig", Collections.singletonMap("envsettings", "envvalue")); + + doTest(_store, virtualHostAttributes); + + ConfigurationEntry expectNewRoot = new ConfigurationEntry(_brokerId, Broker.class.getSimpleName(), Collections.<String, Object>singletonMap(Broker.MODEL_VERSION, "1.4"), Collections.singleton(_virtualHostId), _store); + ConfigurationEntry expectedNewVirtualHost; + { + Map<String, Object> expectedNewVirtualHostMessageSettings = new HashMap<String, Object>(); + expectedNewVirtualHostMessageSettings.put("storeType", "BDB"); + expectedNewVirtualHostMessageSettings.put("storePath", "/mystorepath"); + expectedNewVirtualHostMessageSettings.put("storeUnderfullSize", 1000); + expectedNewVirtualHostMessageSettings.put("storeOverfullSize", 2000); + expectedNewVirtualHostMessageSettings.put("bdbEnvironmentConfig", Collections.singletonMap("envsettings", "envvalue")); + + Map<String, Object> expectedNewVirtualHostAttributes = new HashMap<String, Object>(); + expectedNewVirtualHostAttributes.put(VirtualHost.NAME, "test"); + expectedNewVirtualHostAttributes.put(VirtualHost.TYPE, "STANDARD"); + expectedNewVirtualHostAttributes.put(VirtualHost.MESSAGE_STORE_SETTINGS, expectedNewVirtualHostMessageSettings); + + expectedNewVirtualHost = new ConfigurationEntry(_virtualHostId, VirtualHost.class.getSimpleName(), expectedNewVirtualHostAttributes, Collections.<UUID>emptySet(), _store); + } + verify(_store).save(expectedNewVirtualHost, expectNewRoot); + } + + public void testUpgrade13To14_JDBC() throws Exception + { + HashMap<String, Object> virtualHostAttributes = new HashMap<String, Object>(); + virtualHostAttributes.put("name", "test"); + virtualHostAttributes.put("type", "STANDARD"); + virtualHostAttributes.put("storeType", "JdBC"); + virtualHostAttributes.put("connectionURL", "jdbc:test"); + virtualHostAttributes.put("connectionPool", "BONECP"); + virtualHostAttributes.put("jdbcBigIntType", "NUMBER"); + virtualHostAttributes.put("jdbcBytesForBlob", true); + virtualHostAttributes.put("jdbcVarbinaryType", "TEST"); + virtualHostAttributes.put("jdbcBlobType", "BLOB"); + virtualHostAttributes.put("partitionCount", 10); + virtualHostAttributes.put("maxConnectionsPerPartition", 8); + virtualHostAttributes.put("minConnectionsPerPartition", 2); + + doTest(_store, virtualHostAttributes); + + ConfigurationEntry expectNewRoot = new ConfigurationEntry(_brokerId, Broker.class.getSimpleName(), Collections.<String, Object>singletonMap(Broker.MODEL_VERSION, "1.4"), Collections.singleton(_virtualHostId), _store); + ConfigurationEntry expectedNewVirtualHost; + { + Map<String, Object> expectedNewVirtualHostMessageSettings = new HashMap<String, Object>(); + expectedNewVirtualHostMessageSettings.put("storeType", "JDBC"); + expectedNewVirtualHostMessageSettings.put("connectionURL", "jdbc:test"); + expectedNewVirtualHostMessageSettings.put("connectionPool", "BONECP"); + expectedNewVirtualHostMessageSettings.put("jdbcBigIntType", "NUMBER"); + expectedNewVirtualHostMessageSettings.put("jdbcBytesForBlob", true); + expectedNewVirtualHostMessageSettings.put("jdbcVarbinaryType", "TEST"); + expectedNewVirtualHostMessageSettings.put("jdbcBlobType", "BLOB"); + expectedNewVirtualHostMessageSettings.put("partitionCount", 10); + expectedNewVirtualHostMessageSettings.put("maxConnectionsPerPartition", 8); + expectedNewVirtualHostMessageSettings.put("minConnectionsPerPartition", 2); + + Map<String, Object> expectedNewVirtualHostAttributes = new HashMap<String, Object>(); + expectedNewVirtualHostAttributes.put(VirtualHost.NAME, "test"); + expectedNewVirtualHostAttributes.put(VirtualHost.TYPE, "STANDARD"); + expectedNewVirtualHostAttributes.put(VirtualHost.MESSAGE_STORE_SETTINGS, expectedNewVirtualHostMessageSettings); + + expectedNewVirtualHost = new ConfigurationEntry(_virtualHostId, VirtualHost.class.getSimpleName(), expectedNewVirtualHostAttributes, Collections.<UUID>emptySet(), _store); + } + verify(_store).save(expectedNewVirtualHost, expectNewRoot); + } + + private void doTest(ConfigurationEntryStore store, Map<String,Object> virtualHostAttributes) + { + final ConfigurationEntry virtualHostEntry = new ConfigurationEntry(_virtualHostId, VirtualHost.class.getSimpleName(), virtualHostAttributes, Collections.<UUID>emptySet(), store); + + final ConfigurationEntry rootEntry; + { + Map<String, Object> rootEntryAttributes = Collections.<String, Object>singletonMap(Broker.MODEL_VERSION, "1.3"); + rootEntry = new ConfigurationEntry(_brokerId, Broker.class.getSimpleName(), rootEntryAttributes, Collections.singleton(_virtualHostId), store); + } + + when(store.getRootEntry()).thenReturn(rootEntry); + when(store.getEntry(_virtualHostId)).thenReturn(virtualHostEntry); + + StoreUpgrader.UPGRADE_1_3.doUpgrade(store); + } + +} diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/VirtualHostRecovererTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/VirtualHostRecovererTest.java index 80f935a55e..0eea2663fd 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/VirtualHostRecovererTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/VirtualHostRecovererTest.java @@ -23,6 +23,7 @@ package org.apache.qpid.server.configuration.startup; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -34,6 +35,7 @@ import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.stats.StatisticsGatherer; +import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.TestMemoryMessageStore; import org.apache.qpid.server.store.TestableMemoryMessageStore; import org.apache.qpid.server.virtualhost.StandardVirtualHostFactory; @@ -54,8 +56,7 @@ public class VirtualHostRecovererTest extends TestCase Map<String, Object> attributes = new HashMap<String, Object>(); attributes.put(VirtualHost.NAME, getName()); attributes.put(VirtualHost.TYPE, StandardVirtualHostFactory.TYPE); - - attributes.put(VirtualHost.STORE_TYPE, TestMemoryMessageStore.TYPE); + attributes.put(VirtualHost.MESSAGE_STORE_SETTINGS, Collections.singletonMap(MessageStore.STORE_TYPE, TestMemoryMessageStore.TYPE)); when(entry.getAttributes()).thenReturn(attributes); VirtualHost host = recoverer.create(null, entry, parent); @@ -68,15 +69,9 @@ public class VirtualHostRecovererTest extends TestCase { Map<String, Object> attributes = new HashMap<String, Object>(); attributes.put(VirtualHost.NAME, getName()); - attributes.put(VirtualHost.TYPE, "STANDARD"); - String[] mandatoryAttributes = {VirtualHost.NAME, VirtualHost.TYPE}; - - checkMandatoryAttributesAreValidated(mandatoryAttributes, attributes); - - attributes = new HashMap<String, Object>(); - attributes.put(VirtualHost.NAME, getName()); - attributes.put(VirtualHost.STORE_TYPE, "MEMORY"); - mandatoryAttributes = new String[]{VirtualHost.NAME, VirtualHost.STORE_TYPE}; + attributes.put(VirtualHost.TYPE, StandardVirtualHostFactory.TYPE); + attributes.put(VirtualHost.MESSAGE_STORE_SETTINGS, Collections.singletonMap(MessageStore.STORE_TYPE, TestMemoryMessageStore.TYPE)); + String[] mandatoryAttributes = {VirtualHost.NAME, VirtualHost.TYPE, VirtualHost.MESSAGE_STORE_SETTINGS}; checkMandatoryAttributesAreValidated(mandatoryAttributes, attributes); } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java index 2410df1fe0..266049e611 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java @@ -33,6 +33,7 @@ import org.apache.qpid.server.configuration.RecovererProvider; import org.apache.qpid.server.configuration.startup.VirtualHostRecoverer; import org.apache.qpid.server.configuration.updater.TaskExecutor; import org.apache.qpid.server.stats.StatisticsGatherer; +import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.TestMemoryMessageStore; import org.apache.qpid.server.util.BrokerTestHelper; import org.apache.qpid.server.virtualhost.StandardVirtualHostFactory; @@ -88,7 +89,7 @@ public class VirtualHostTest extends QpidTestCase Map<String, Object> attributes = new HashMap<String, Object>(); attributes.put(VirtualHost.NAME, getName()); attributes.put(VirtualHost.TYPE, StandardVirtualHostFactory.TYPE); - attributes.put(VirtualHost.STORE_TYPE, TestMemoryMessageStore.TYPE); + attributes.put(VirtualHost.MESSAGE_STORE_SETTINGS, Collections.singletonMap(MessageStore.STORE_TYPE, TestMemoryMessageStore.TYPE)); attributes.put(VirtualHost.STATE, State.QUIESCED); VirtualHost host = createHost(attributes); @@ -149,7 +150,7 @@ public class VirtualHostTest extends QpidTestCase Map<String, Object> attributes = new HashMap<String, Object>(); attributes.put(VirtualHost.NAME, getName()); attributes.put(VirtualHost.TYPE, StandardVirtualHostFactory.TYPE); - attributes.put(VirtualHost.STORE_TYPE, TestMemoryMessageStore.TYPE); + attributes.put(VirtualHost.MESSAGE_STORE_SETTINGS, Collections.singletonMap(MessageStore.STORE_TYPE, TestMemoryMessageStore.TYPE)); VirtualHost host = createHost(attributes); return host; diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java index a9036a49a4..931fe36d5d 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java @@ -84,16 +84,19 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest private UUID _queueId; private UUID _exchangeId; private DurableConfigurationStore _configStore; + protected Map<String, Object> _messageStoreSettings; public void setUp() throws Exception { super.setUp(); + _messageStoreSettings = new HashMap<String, Object>(); _queueId = UUIDGenerator.generateRandomUUID(); _exchangeId = UUIDGenerator.generateRandomUUID(); _storeName = getName(); _storePath = TMP_FOLDER + File.separator + _storeName; + _messageStoreSettings.put(MessageStore.STORE_PATH, _storePath); FileUtils.delete(new File(_storePath), true); setTestSystemProperty("QPID_WORK", TMP_FOLDER); @@ -114,7 +117,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest when(_exchange.getId()).thenReturn(_exchangeId); when(_exchange.getExchangeType()).thenReturn(mock(ExchangeType.class)); when(_exchange.getEventLogger()).thenReturn(new EventLogger()); - when(_virtualHost.getAttribute(eq(VirtualHost.STORE_PATH))).thenReturn(_storePath); + when(_virtualHost.getMessageStoreSettings()).thenReturn(_messageStoreSettings); _bindingArgs = new HashMap<String, Object>(); String argKey = AMQPFilterTypes.JMS_SELECTOR.toString(); diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java index 7b29a48d60..908f3fe6e1 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java @@ -20,6 +20,9 @@ */ package org.apache.qpid.server.store; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + import java.io.File; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -33,10 +36,6 @@ import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRec import org.apache.qpid.test.utils.QpidTestCase; import org.apache.qpid.util.FileUtils; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - public abstract class MessageStoreQuotaEventsTestBase extends QpidTestCase implements EventListener, TransactionLogResource { private static final Logger _logger = Logger.getLogger(MessageStoreQuotaEventsTestBase.class); @@ -50,9 +49,7 @@ public abstract class MessageStoreQuotaEventsTestBase extends QpidTestCase imple private UUID _transactionResource; protected abstract MessageStore createStore() throws Exception; - - protected abstract void applyStoreSpecificConfiguration(VirtualHost virtualHost); - + protected abstract VirtualHost<?> createVirtualHost(String storeLocation); protected abstract int getNumberOfMessagesToFillStore(); @Override @@ -64,11 +61,7 @@ public abstract class MessageStoreQuotaEventsTestBase extends QpidTestCase imple FileUtils.delete(_storeLocation, true); - VirtualHost vhost = mock(VirtualHost.class); - when(vhost.getAttribute(eq(VirtualHost.STORE_PATH))).thenReturn(_storeLocation.getAbsolutePath()); - when(vhost.getName()).thenReturn("test"); - - applyStoreSpecificConfiguration(vhost); + VirtualHost<?> vhost = createVirtualHost(_storeLocation.getAbsolutePath()); _store = createStore(); ((DurableConfigurationStore)_store).configureConfigStore(vhost, null); @@ -82,6 +75,7 @@ public abstract class MessageStoreQuotaEventsTestBase extends QpidTestCase imple _store.addEventListener(this, Event.PERSISTENT_MESSAGE_SIZE_OVERFULL, Event.PERSISTENT_MESSAGE_SIZE_UNDERFULL); } + @Override public void tearDown() throws Exception { diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/StateManagerTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/StateManagerTest.java index 3ee98f9a21..1996620950 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/StateManagerTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/StateManagerTest.java @@ -22,6 +22,7 @@ package org.apache.qpid.server.store; import java.util.EnumSet; + import junit.framework.TestCase; public class StateManagerTest extends TestCase implements EventListener @@ -115,7 +116,7 @@ public class StateManagerTest extends TestCase implements EventListener performValidTransition(StateManager.INITIALISE_COMPLETE); performValidTransition(StateManager.CLOSE_INITIALISED); performValidTransition(StateManager.CLOSE_COMPLETE); - + _manager = new StateManager(this); performValidTransition(StateManager.INITIALISE); performValidTransition(StateManager.INITIALISE_COMPLETE); @@ -141,13 +142,13 @@ public class StateManagerTest extends TestCase implements EventListener performInvalidTransitions(StateManager.INITIALISE, State.INITIALISED); performInvalidTransitions(StateManager.INITIALISE_COMPLETE, State.ACTIVATING, State.CLOSING); - performInvalidTransitions(StateManager.ACTIVATE, State.ACTIVE); + performInvalidTransitions(StateManager.ACTIVATE, State.ACTIVE, State.CLOSING); performInvalidTransitions(StateManager.ACTIVATE_COMPLETE, State.QUIESCING, State.CLOSING, State.INITIALISED); performInvalidTransitions(StateManager.QUIESCE, State.QUIESCED); performInvalidTransitions(StateManager.QUIESCE_COMPLETE, State.ACTIVATING, State.CLOSING); performInvalidTransitions(StateManager.CLOSE_QUIESCED, State.CLOSED); performInvalidTransitions(StateManager.CLOSE_COMPLETE); - + } private void performInvalidTransitions(StateManager.Transition preTransition, State... validEndStates) @@ -156,7 +157,7 @@ public class StateManagerTest extends TestCase implements EventListener { performValidTransition(preTransition); } - + EnumSet<State> endStates = EnumSet.allOf(State.class); if(validEndStates != null) @@ -166,13 +167,13 @@ public class StateManagerTest extends TestCase implements EventListener endStates.remove(state); } } - + for(State invalidEndState : endStates) { performInvalidStateTransition(invalidEndState); } - + } private void performInvalidStateTransition(State invalidEndState) diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java index 0dc25a2ad2..fd56f3fa1c 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java @@ -42,6 +42,7 @@ import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.security.SubjectCreator; import org.apache.qpid.server.stats.StatisticsGatherer; +import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.TestMemoryMessageStore; import org.apache.qpid.server.store.TestableMemoryMessageStore; import org.apache.qpid.server.virtualhost.StandardVirtualHostFactory; @@ -108,8 +109,10 @@ public class BrokerTestHelper when(virtualHost.getType()).thenReturn(StandardVirtualHostFactory.TYPE); when(virtualHost.getAttribute(org.apache.qpid.server.model.VirtualHost.TYPE)).thenReturn(StandardVirtualHostFactory.TYPE); - when(virtualHost.getStoreType()).thenReturn(TestableMemoryMessageStore.TYPE); - when(virtualHost.getAttribute(org.apache.qpid.server.model.VirtualHost.STORE_TYPE)).thenReturn(TestableMemoryMessageStore.TYPE); + Map<String, Object> messageStoreSettings = new HashMap<String, Object>(); + messageStoreSettings.put(MessageStore.STORE_TYPE, TestableMemoryMessageStore.TYPE); + + when(virtualHost.getMessageStoreSettings()).thenReturn(messageStoreSettings); when(virtualHost.getName()).thenReturn(name); return createVirtualHost(virtualHostRegistry, virtualHost); } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/StandardVirtualHostTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/StandardVirtualHostTest.java index 570e748d7a..35b4b89bf6 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/StandardVirtualHostTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/StandardVirtualHostTest.java @@ -30,7 +30,6 @@ import java.util.Map; import java.util.UUID; import org.apache.qpid.server.binding.BindingImpl; - import org.apache.qpid.server.exchange.AbstractExchange; import org.apache.qpid.server.exchange.ExchangeImpl; import org.apache.qpid.server.configuration.IllegalConfigurationException; @@ -42,7 +41,9 @@ import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.stats.StatisticsGatherer; import org.apache.qpid.server.store.ConfigurationRecoveryHandler; import org.apache.qpid.server.store.JsonFileConfigStore; +import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.TestMemoryMessageStore; +import org.apache.qpid.server.store.TestableMemoryMessageStore; import org.apache.qpid.server.util.BrokerTestHelper; import org.apache.qpid.test.utils.QpidTestCase; import org.apache.qpid.test.utils.TestFileUtils; @@ -249,13 +250,18 @@ public class StandardVirtualHostTest extends QpidTestCase private VirtualHost createVirtualHost(String virtualHostName) throws Exception { Broker<?> broker = BrokerTestHelper.createBrokerMock(); + _virtualHostRegistry = broker.getVirtualHostRegistry(); org.apache.qpid.server.model.VirtualHost<?> model = mock(org.apache.qpid.server.model.VirtualHost.class); when(model.getAttribute(org.apache.qpid.server.model.VirtualHost.CONFIG_STORE_TYPE)).thenReturn(JsonFileConfigStore.TYPE); when(model.getAttribute(org.apache.qpid.server.model.VirtualHost.CONFIG_STORE_PATH)).thenReturn(_storeFolder.getAbsolutePath()); - when(model.getAttribute(org.apache.qpid.server.model.VirtualHost.STORE_TYPE)).thenReturn(TestMemoryMessageStore.TYPE); + + Map<String, Object> messageStoreSettings = new HashMap<String, Object>(); + messageStoreSettings.put(MessageStore.STORE_TYPE, TestableMemoryMessageStore.TYPE); + when(model.getMessageStoreSettings()).thenReturn(messageStoreSettings); when(model.getName()).thenReturn(virtualHostName); + VirtualHost host = new StandardVirtualHostFactory().createVirtualHost(_virtualHostRegistry, mock(StatisticsGatherer.class), new SecurityManager(broker, false), model); _virtualHostRegistry.registerVirtualHost(host); diff --git a/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java b/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java index 5d56329c20..25ce3b8adc 100644 --- a/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java +++ b/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java @@ -32,6 +32,8 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.util.ArrayList; import java.util.List; +import java.util.Map; + import org.apache.log4j.Logger; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.store.AbstractJDBCMessageStore; @@ -39,7 +41,6 @@ import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.Event; import org.apache.qpid.server.store.EventListener; import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.MessageStoreConstants; import org.apache.qpid.server.store.StoreException; import org.apache.qpid.util.FileUtils; @@ -130,13 +131,25 @@ public class DerbyMessageStore extends AbstractJDBCMessageStore implements Messa { //Update to pick up QPID_WORK and use that as the default location not just derbyDB + Map<String, Object> messageStoreSettings = virtualHost.getMessageStoreSettings(); _driverClass = (Class<Driver>) Class.forName(SQL_DRIVER_NAME); - String defaultPath = System.getProperty("QPID_WORK") + File.separator + "derbyDB"; - String databasePath = isConfigStoreOnly() ? (String) virtualHost.getAttribute(VirtualHost.CONFIG_STORE_PATH) : (String) virtualHost.getAttribute(VirtualHost.STORE_PATH); + String databasePath = null; + if (isConfigStoreOnly()) + { + databasePath = (String) virtualHost.getAttribute(VirtualHost.CONFIG_STORE_PATH); + } + else + { + if (messageStoreSettings != null) + { + databasePath = (String) messageStoreSettings.get(MessageStore.STORE_PATH); + } + } + if(databasePath == null) { - databasePath = defaultPath; + databasePath = System.getProperty("QPID_WORK") + File.separator + "derbyDB"; } if(!MEMORY_STORE_LOCATION.equals(databasePath)) @@ -154,8 +167,8 @@ public class DerbyMessageStore extends AbstractJDBCMessageStore implements Messa _storeLocation = databasePath; - Object overfullAttr = virtualHost.getAttribute(MessageStoreConstants.OVERFULL_SIZE_ATTRIBUTE); - Object underfullAttr = virtualHost.getAttribute(MessageStoreConstants.UNDERFULL_SIZE_ATTRIBUTE); + Object overfullAttr = messageStoreSettings.get(MessageStore.OVERFULL_SIZE); + Object underfullAttr = messageStoreSettings.get(MessageStore.UNDERFULL_SIZE); _persistentSizeHighThreshold = overfullAttr == null ? -1l : overfullAttr instanceof Number ? ((Number) overfullAttr).longValue() : Long.parseLong(overfullAttr.toString()); diff --git a/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStoreFactory.java b/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStoreFactory.java index d64420a808..4e81c4e9ba 100644 --- a/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStoreFactory.java +++ b/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStoreFactory.java @@ -52,12 +52,15 @@ public class DerbyMessageStoreFactory implements MessageStoreFactory, DurableCon @Override public void validateAttributes(Map<String, Object> attributes) { - if(getType().equals(attributes.get(VirtualHost.STORE_TYPE))) + @SuppressWarnings("unchecked") + Map<String, Object> messageStoreSettings = (Map<String, Object>) attributes.get(VirtualHost.MESSAGE_STORE_SETTINGS); + + if(getType().equals(messageStoreSettings.get(MessageStore.STORE_TYPE))) { - Object storePath = attributes.get(VirtualHost.STORE_PATH); + Object storePath = messageStoreSettings.get(MessageStore.STORE_PATH); if(!(storePath instanceof String)) { - throw new IllegalArgumentException("Attribute '"+ VirtualHost.STORE_PATH + throw new IllegalArgumentException("Setting '"+ MessageStore.STORE_PATH +"' is required and must be of type String."); } diff --git a/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreQuotaEventsTest.java b/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreQuotaEventsTest.java index 479675dac1..f23b5a3e23 100644 --- a/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreQuotaEventsTest.java +++ b/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreQuotaEventsTest.java @@ -20,15 +20,17 @@ */ package org.apache.qpid.server.store.derby; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.HashMap; +import java.util.Map; + import org.apache.log4j.Logger; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.MessageStoreConstants; import org.apache.qpid.server.store.MessageStoreQuotaEventsTestBase; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.when; - public class DerbyMessageStoreQuotaEventsTest extends MessageStoreQuotaEventsTestBase { private static final Logger _logger = Logger.getLogger(DerbyMessageStoreQuotaEventsTest.class); @@ -50,17 +52,26 @@ public class DerbyMessageStoreQuotaEventsTest extends MessageStoreQuotaEventsTes } @Override - protected void applyStoreSpecificConfiguration(VirtualHost vhost) + protected MessageStore createStore() throws Exception { - _logger.debug("Applying store specific config. overfull-sze=" + OVERFULL_SIZE + ", underfull-size=" + UNDERFULL_SIZE); - - when(vhost.getAttribute(eq(MessageStoreConstants.OVERFULL_SIZE_ATTRIBUTE))).thenReturn(OVERFULL_SIZE); - when(vhost.getAttribute(eq(MessageStoreConstants.UNDERFULL_SIZE_ATTRIBUTE))).thenReturn(UNDERFULL_SIZE); + return new DerbyMessageStore(); } @Override - protected MessageStore createStore() throws Exception + protected VirtualHost<?> createVirtualHost(String storeLocation) { - return new DerbyMessageStore(); + _logger.debug("Applying store specific config. overfull-size=" + OVERFULL_SIZE + ", underfull-size=" + UNDERFULL_SIZE); + + VirtualHost<?> vhost = mock(VirtualHost.class); + Map<String, Object> messageStoreSettings = new HashMap<String, Object>(); + messageStoreSettings.put(MessageStore.STORE_PATH, storeLocation); + messageStoreSettings.put(MessageStore.OVERFULL_SIZE, OVERFULL_SIZE); + messageStoreSettings.put(MessageStore.UNDERFULL_SIZE, UNDERFULL_SIZE); + + when(vhost.getMessageStoreSettings()).thenReturn(messageStoreSettings ); + when(vhost.getName()).thenReturn("test"); + + return vhost; } + } diff --git a/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreTest.java b/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreTest.java index 859fad629b..e3f91cc8fb 100644 --- a/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreTest.java +++ b/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreTest.java @@ -20,16 +20,17 @@ */ package org.apache.qpid.server.store.derby; +import static org.mockito.Mockito.when; + import java.io.File; +import java.util.HashMap; +import java.util.Map; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.MessageStoreTestCase; import org.apache.qpid.util.FileUtils; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.when; - public class DerbyMessageStoreTest extends MessageStoreTestCase { private String _storeLocation; @@ -63,7 +64,9 @@ public class DerbyMessageStoreTest extends MessageStoreTestCase protected void setUpStoreConfiguration(VirtualHost virtualHost) throws Exception { _storeLocation = TMP_FOLDER + File.separator + getTestName(); - when(virtualHost.getAttribute(eq(VirtualHost.STORE_PATH))).thenReturn(_storeLocation); + Map<String, Object> messageStoreSettings = new HashMap<String, Object>(); + messageStoreSettings.put(MessageStore.STORE_PATH, _storeLocation); + when(virtualHost.getMessageStoreSettings()).thenReturn(messageStoreSettings); deleteStoreIfExists(); } diff --git a/qpid/java/broker-plugins/jdbc-provider-bone/src/main/java/org/apache/qpid/server/store/jdbc/bonecp/BoneCPConnectionProvider.java b/qpid/java/broker-plugins/jdbc-provider-bone/src/main/java/org/apache/qpid/server/store/jdbc/bonecp/BoneCPConnectionProvider.java index 1d9ff9a8e1..370f92651c 100644 --- a/qpid/java/broker-plugins/jdbc-provider-bone/src/main/java/org/apache/qpid/server/store/jdbc/bonecp/BoneCPConnectionProvider.java +++ b/qpid/java/broker-plugins/jdbc-provider-bone/src/main/java/org/apache/qpid/server/store/jdbc/bonecp/BoneCPConnectionProvider.java @@ -20,29 +20,47 @@ */ package org.apache.qpid.server.store.jdbc.bonecp; -import com.jolbox.bonecp.BoneCP; -import com.jolbox.bonecp.BoneCPConfig; import java.sql.Connection; import java.sql.SQLException; +import java.util.Map; + import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.store.jdbc.ConnectionProvider; +import org.apache.qpid.server.util.MapValueConverter; + +import com.jolbox.bonecp.BoneCP; +import com.jolbox.bonecp.BoneCPConfig; public class BoneCPConnectionProvider implements ConnectionProvider { + public static final String PARTITION_COUNT = "partitionCount"; + public static final String MAX_CONNECTIONS_PER_PARTITION = "maxConnectionsPerPartition"; + public static final String MIN_CONNECTIONS_PER_PARTITION = "minConnectionsPerPartition"; + public static final int DEFAULT_MIN_CONNECTIONS_PER_PARTITION = 5; public static final int DEFAULT_MAX_CONNECTIONS_PER_PARTITION = 10; public static final int DEFAULT_PARTITION_COUNT = 4; + private final BoneCP _connectionPool; - public BoneCPConnectionProvider(String connectionUrl, VirtualHost virtualHost) throws SQLException + public BoneCPConnectionProvider(String connectionUrl, VirtualHost virtualHost, boolean configStoreOnly) throws SQLException { BoneCPConfig config = new BoneCPConfig(); config.setJdbcUrl(connectionUrl); - - config.setMinConnectionsPerPartition(getIntegerAttribute(virtualHost, "minConnectionsPerPartition", DEFAULT_MIN_CONNECTIONS_PER_PARTITION)); - config.setMaxConnectionsPerPartition(getIntegerAttribute(virtualHost, "maxConnectionsPerPartition", DEFAULT_MAX_CONNECTIONS_PER_PARTITION)); - config.setPartitionCount(getIntegerAttribute(virtualHost, "partitionCount",DEFAULT_PARTITION_COUNT)); + if (configStoreOnly) + { + config.setMinConnectionsPerPartition(getIntegerAttribute(virtualHost, MIN_CONNECTIONS_PER_PARTITION, DEFAULT_MIN_CONNECTIONS_PER_PARTITION)); + config.setMaxConnectionsPerPartition(getIntegerAttribute(virtualHost, MAX_CONNECTIONS_PER_PARTITION, DEFAULT_MAX_CONNECTIONS_PER_PARTITION)); + config.setPartitionCount(getIntegerAttribute(virtualHost, PARTITION_COUNT,DEFAULT_PARTITION_COUNT)); + } + else + { + Map<String, Object> storeSettings = virtualHost.getMessageStoreSettings(); + config.setMinConnectionsPerPartition(MapValueConverter.getIntegerAttribute(MIN_CONNECTIONS_PER_PARTITION, storeSettings, DEFAULT_MIN_CONNECTIONS_PER_PARTITION)); + config.setMaxConnectionsPerPartition(MapValueConverter.getIntegerAttribute(MAX_CONNECTIONS_PER_PARTITION, storeSettings, DEFAULT_MAX_CONNECTIONS_PER_PARTITION)); + config.setPartitionCount(MapValueConverter.getIntegerAttribute(PARTITION_COUNT, storeSettings,DEFAULT_PARTITION_COUNT)); + } _connectionPool = new BoneCP(config); } diff --git a/qpid/java/broker-plugins/jdbc-provider-bone/src/main/java/org/apache/qpid/server/store/jdbc/bonecp/BoneCPConnectionProviderFactory.java b/qpid/java/broker-plugins/jdbc-provider-bone/src/main/java/org/apache/qpid/server/store/jdbc/bonecp/BoneCPConnectionProviderFactory.java index b95ad1166c..f12e7f35e6 100644 --- a/qpid/java/broker-plugins/jdbc-provider-bone/src/main/java/org/apache/qpid/server/store/jdbc/bonecp/BoneCPConnectionProviderFactory.java +++ b/qpid/java/broker-plugins/jdbc-provider-bone/src/main/java/org/apache/qpid/server/store/jdbc/bonecp/BoneCPConnectionProviderFactory.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.store.jdbc.bonecp; import java.sql.SQLException; + import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.plugin.JDBCConnectionProviderFactory; import org.apache.qpid.server.store.jdbc.ConnectionProvider; @@ -34,9 +35,9 @@ public class BoneCPConnectionProviderFactory implements JDBCConnectionProviderFa } @Override - public ConnectionProvider getConnectionProvider(String connectionUrl, VirtualHost virtualHost) + public ConnectionProvider getConnectionProvider(String connectionUrl, VirtualHost virtualHost, boolean configStoreOnly) throws SQLException { - return new BoneCPConnectionProvider(connectionUrl, virtualHost); + return new BoneCPConnectionProvider(connectionUrl, virtualHost, configStoreOnly); } } diff --git a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/DefaultConnectionProviderFactory.java b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/DefaultConnectionProviderFactory.java index 8fc7de12d0..cc7fda4c82 100644 --- a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/DefaultConnectionProviderFactory.java +++ b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/DefaultConnectionProviderFactory.java @@ -34,7 +34,7 @@ public class DefaultConnectionProviderFactory implements JDBCConnectionProviderF @Override public ConnectionProvider getConnectionProvider(String connectionUrl, - VirtualHost virtualHost) + VirtualHost virtualHost, boolean configStoreOnly) { return new DefaultConnectionProvider(connectionUrl); } diff --git a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java index 621759ef85..d8fb124c69 100644 --- a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java +++ b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java @@ -29,6 +29,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; + import org.apache.log4j.Logger; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.plugin.JDBCConnectionProviderFactory; @@ -37,6 +38,7 @@ import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoreException; import org.apache.qpid.server.store.StoreFuture; import org.apache.qpid.server.store.Transaction; +import org.apache.qpid.server.util.MapValueConverter; /** * An implementation of a {@link org.apache.qpid.server.store.MessageStore} that uses a JDBC database as the persistence @@ -48,10 +50,14 @@ public class JDBCMessageStore extends AbstractJDBCMessageStore implements Messag private static final Logger _logger = Logger.getLogger(JDBCMessageStore.class); - public static final String TYPE = "JDBC"; public static final String CONNECTION_URL = "connectionURL"; public static final String CONFIG_CONNECTION_URL = "configConnectionURL"; + public static final String CONNECTION_POOL = "connectionPool"; + public static final String JDBC_BIG_INT_TYPE = "jdbcBigIntType"; + public static final String JDBC_BYTES_FOR_BLOB = "jdbcBytesForBlob"; + public static final String JDBC_VARBINARY_TYPE = "jdbcVarbinaryType"; + public static final String JDBC_BLOB_TYPE = "jdbcBlobType"; protected String _connectionURL; private ConnectionProvider _connectionProvider; @@ -280,19 +286,24 @@ public class JDBCMessageStore extends AbstractJDBCMessageStore implements Messag VirtualHost virtualHost) throws ClassNotFoundException, SQLException { + Map<String, Object> messageStoreSettings = virtualHost.getMessageStoreSettings(); String connectionURL; - if(!isConfigStoreOnly()) + Object poolAttribute = null; + boolean configStoreOnly = isConfigStoreOnly(); + if(!configStoreOnly) { - connectionURL = virtualHost.getAttribute(CONNECTION_URL) == null - ? String.valueOf(virtualHost.getAttribute(VirtualHost.STORE_PATH)) - : String.valueOf(virtualHost.getAttribute(CONNECTION_URL)); + connectionURL = messageStoreSettings.get(CONNECTION_URL) == null + ? String.valueOf(messageStoreSettings.get(MessageStore.STORE_PATH)) + : String.valueOf(messageStoreSettings.get(CONNECTION_URL)); + poolAttribute = messageStoreSettings.get(CONNECTION_POOL); } else { connectionURL = virtualHost.getAttribute(CONFIG_CONNECTION_URL) == null ? String.valueOf(virtualHost.getAttribute(VirtualHost.CONFIG_STORE_PATH)) : String.valueOf(virtualHost.getAttribute(CONFIG_CONNECTION_URL)); + poolAttribute = virtualHost.getAttribute(CONNECTION_POOL); } JDBCDetails details = null; @@ -312,8 +323,6 @@ public class JDBCMessageStore extends AbstractJDBCMessageStore implements Messag details = DERBY_DETAILS; } - - Object poolAttribute = virtualHost.getAttribute("connectionPool"); String connectionPoolType = poolAttribute == null ? "DEFAULT" : String.valueOf(poolAttribute); JDBCConnectionProviderFactory connectionProviderFactory = @@ -324,12 +333,22 @@ public class JDBCMessageStore extends AbstractJDBCMessageStore implements Messag connectionProviderFactory = new DefaultConnectionProviderFactory(); } - _connectionProvider = connectionProviderFactory.getConnectionProvider(connectionURL, virtualHost); + _connectionProvider = connectionProviderFactory.getConnectionProvider(connectionURL, virtualHost, configStoreOnly); - _blobType = getStringAttribute(virtualHost, "jdbcBlobType",details.getBlobType()); - _varBinaryType = getStringAttribute(virtualHost, "jdbcVarbinaryType",details.getVarBinaryType()); - _useBytesMethodsForBlob = getBooleanAttribute(virtualHost, "jdbcBytesForBlob",details.isUseBytesMethodsForBlob()); - _bigIntType = getStringAttribute(virtualHost, "jdbcBigIntType", details.getBigintType()); + if(!configStoreOnly) + { + _blobType = MapValueConverter.getStringAttribute(JDBC_BLOB_TYPE, messageStoreSettings, details.getBlobType()); + _varBinaryType = MapValueConverter.getStringAttribute(JDBC_VARBINARY_TYPE, messageStoreSettings, details.getVarBinaryType()); + _useBytesMethodsForBlob = MapValueConverter.getBooleanAttribute(JDBC_BYTES_FOR_BLOB, messageStoreSettings, details.isUseBytesMethodsForBlob()); + _bigIntType = MapValueConverter.getStringAttribute(JDBC_BIG_INT_TYPE, messageStoreSettings, details.getBigintType()); + } + else + { + _blobType = getStringAttribute(virtualHost, JDBC_BLOB_TYPE,details.getBlobType()); + _varBinaryType = getStringAttribute(virtualHost, JDBC_VARBINARY_TYPE,details.getVarBinaryType()); + _useBytesMethodsForBlob = getBooleanAttribute(virtualHost, JDBC_BYTES_FOR_BLOB,details.isUseBytesMethodsForBlob()); + _bigIntType = getStringAttribute(virtualHost, JDBC_BIG_INT_TYPE, details.getBigintType()); + } } diff --git a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreFactory.java b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreFactory.java index d22fc21b74..0a8f682f16 100644 --- a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreFactory.java +++ b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreFactory.java @@ -52,18 +52,17 @@ public class JDBCMessageStoreFactory implements MessageStoreFactory, DurableConf @Override public void validateAttributes(Map<String, Object> attributes) { - if(getType().equals(attributes.get(VirtualHost.STORE_TYPE))) + @SuppressWarnings("unchecked") + Map<String, Object> messageStoreSettings = (Map<String, Object>) attributes.get(VirtualHost.MESSAGE_STORE_SETTINGS); + + if(getType().equals(messageStoreSettings.get(MessageStore.STORE_TYPE))) { - Object connectionURL = attributes.get(JDBCMessageStore.CONNECTION_URL); + Object connectionURL = messageStoreSettings.get(JDBCMessageStore.CONNECTION_URL); if(!(connectionURL instanceof String)) { - Object storePath = attributes.get(VirtualHost.STORE_PATH); - if(!(storePath instanceof String)) - { - throw new IllegalArgumentException("Attribute '"+ JDBCMessageStore.CONNECTION_URL - +"' is required and must be of type String."); + throw new IllegalArgumentException("Setting '"+ JDBCMessageStore.CONNECTION_URL + +"' is required and must be of type String."); - } } } if(getType().equals(attributes.get(VirtualHost.CONFIG_STORE_TYPE))) diff --git a/qpid/java/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java b/qpid/java/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java index 9c348383c6..65bf795045 100644 --- a/qpid/java/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java +++ b/qpid/java/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java @@ -20,12 +20,15 @@ */ package org.apache.qpid.server.store.jdbc; +import java.io.File; import java.sql.Connection; import java.sql.DatabaseMetaData; import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.SQLException; +import java.util.HashMap; import java.util.HashSet; +import java.util.Map; import java.util.Set; import org.apache.qpid.server.model.VirtualHost; @@ -66,10 +69,12 @@ public class JDBCMessageStoreTest extends MessageStoreTestCase protected void setUpStoreConfiguration(VirtualHost virtualHost) throws Exception { _connectionURL = "jdbc:derby:memory:/" + getTestName() + ";create=true"; - - when(virtualHost.getAttribute(eq("connectionURL"))).thenReturn(_connectionURL); + Map<String, Object> messageStoreSettings = new HashMap<String, Object>(); + messageStoreSettings.put(JDBCMessageStore.CONNECTION_URL, _connectionURL); + when(virtualHost.getMessageStoreSettings()).thenReturn(messageStoreSettings); } + @Override protected MessageStore createMessageStore() { diff --git a/qpid/java/systests/etc/config-systests.json b/qpid/java/systests/etc/config-systests.json index 60f3f7f174..dac8ee4fd4 100644 --- a/qpid/java/systests/etc/config-systests.json +++ b/qpid/java/systests/etc/config-systests.json @@ -22,7 +22,7 @@ "name": "Broker", "defaultVirtualHost" : "test", "storeVersion": 1, - "modelVersion": "1.3", + "modelVersion": "1.4", "authenticationproviders" : [ { "name" : "plain", "type" : "PlainPasswordFile", @@ -60,7 +60,9 @@ "virtualhosts" : [ { "name" : "test", "type" : "STANDARD", - "storeType": "${messagestore.type}", - "storePath" : "${QPID_WORK}/test" + "messageStoreSettings": { + "storeType": "${messagestore.type}", + "storePath" : "${QPID_WORK}/test/${test.port}" + } } ] } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java index 9dc981a358..36e86fbe7b 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java @@ -115,9 +115,12 @@ public class MessageStoreTest extends QpidTestCase String hostName = getName(); _storePath = System.getProperty("QPID_WORK", TMP_FOLDER + File.separator + getTestName()) + File.separator + hostName; + Map<String, Object> messageStoreSettings = new HashMap<String, Object>(); + messageStoreSettings.put(MessageStore.STORE_PATH, _storePath); + messageStoreSettings.put(MessageStore.STORE_TYPE, getTestProfileMessageStoreType()); + _virtualHostModel = mock(org.apache.qpid.server.model.VirtualHost.class); - when(_virtualHostModel.getAttribute(eq(org.apache.qpid.server.model.VirtualHost.STORE_PATH))).thenReturn(_storePath); - when(_virtualHostModel.getAttribute(eq(org.apache.qpid.server.model.VirtualHost.STORE_TYPE))).thenReturn(getTestProfileMessageStoreType()); + when(_virtualHostModel.getMessageStoreSettings()).thenReturn(messageStoreSettings); when(_virtualHostModel.getAttribute(eq(org.apache.qpid.server.model.VirtualHost.TYPE))).thenReturn(StandardVirtualHostFactory.TYPE); when(_virtualHostModel.getAttribute(eq(org.apache.qpid.server.model.VirtualHost.NAME))).thenReturn(hostName); when(_virtualHostModel.getType()).thenReturn(StandardVirtualHostFactory.TYPE); diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java index 9bc1a57261..7017ea6d45 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.server.store; +import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -50,14 +51,15 @@ public class @Override public void configureConfigStore(VirtualHost virtualHost, ConfigurationRecoveryHandler recoveryHandler) { - Object overfullAttr = virtualHost.getAttribute(MessageStoreConstants.OVERFULL_SIZE_ATTRIBUTE); + Map<String, Object> messageStoreSettings = virtualHost.getMessageStoreSettings(); + Object overfullAttr = messageStoreSettings.get(MessageStore.OVERFULL_SIZE); _persistentSizeHighThreshold = overfullAttr == null ? Long.MAX_VALUE : overfullAttr instanceof Number ? ((Number)overfullAttr).longValue() : Long.parseLong(overfullAttr.toString()); - Object underfullAttr = virtualHost.getAttribute(MessageStoreConstants.UNDERFULL_SIZE_ATTRIBUTE); + Object underfullAttr = messageStoreSettings.get(MessageStore.UNDERFULL_SIZE); _persistentSizeLowThreshold = overfullAttr == null ? _persistentSizeHighThreshold diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java index 6b03151f29..cc3e7574b8 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java @@ -21,54 +21,71 @@ package org.apache.qpid.server.store; import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; import java.util.Map; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; import org.apache.log4j.Logger; import org.apache.qpid.server.message.EnqueueableMessage; -import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.plugin.MessageStoreFactory; -import java.nio.ByteBuffer; -import java.util.HashMap; - public class SlowMessageStore implements MessageStore, DurableConfigurationStore { private static final Logger _logger = Logger.getLogger(SlowMessageStore.class); + + public static final String TYPE = "SLOW"; + public static final String DELAYS = "delays"; + public static final String REAL_STORE = "realStore"; + + private static final String DEFAULT_DELAY = "default"; + private static final String PRE = "pre"; + private static final String POST = "post"; + private HashMap<String, Long> _preDelays = new HashMap<String, Long>(); private HashMap<String, Long> _postDelays = new HashMap<String, Long>(); private long _defaultDelay = 0L; private MessageStore _realStore = null; private DurableConfigurationStore _durableConfigurationStore = null; - private static final String PRE = "pre"; - private static final String POST = "post"; - public static final String TYPE = "SLOW"; - private String DEFAULT_DELAY = "default"; + + private Map<EventListener, Event[]> _eventListeners = new ConcurrentHashMap<EventListener, Event[]>(); // ***** MessageStore Interface. + @Override public void configureConfigStore(VirtualHost virtualHost, ConfigurationRecoveryHandler recoveryHandler) { _logger.info("Starting SlowMessageStore on Virtualhost:" + virtualHost.getName()); - Object delaysAttr = virtualHost.getAttribute("slowMessageStoreDelays"); + Map<String, Object> messageStoreSettings = virtualHost.getMessageStoreSettings(); + Object delaysAttr = messageStoreSettings.get(DELAYS); @SuppressWarnings({ "unchecked" }) Map<String,Object> delays = (delaysAttr instanceof Map) ? (Map<String,Object>) delaysAttr : Collections.<String,Object>emptyMap(); configureDelays(delays); - final Object realStoreAttr = virtualHost.getAttribute("realStore"); + final Object realStoreAttr = messageStoreSettings.get(REAL_STORE); String messageStoreType = realStoreAttr == null ? MemoryMessageStore.TYPE : realStoreAttr.toString(); - if (delays.containsKey(DEFAULT_DELAY)) { _defaultDelay = Long.parseLong(String.valueOf(delays.get(DEFAULT_DELAY))); } _realStore = MessageStoreFactory.FACTORY_LOADER.get(messageStoreType).createMessageStore(); - + + if (!_eventListeners.isEmpty()) + { + for (Iterator<Map.Entry<EventListener, Event[]>> it = _eventListeners.entrySet().iterator(); it.hasNext();) + { + Map.Entry<EventListener, Event[]> entry = it.next(); + _realStore.addEventListener(entry.getKey(), entry.getValue()); + it.remove(); + } + } + if (_realStore instanceof DurableConfigurationStore) { _durableConfigurationStore = (DurableConfigurationStore)_realStore; @@ -141,13 +158,14 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore } } - + @Override public void configureMessageStore(VirtualHost virtualHost, MessageStoreRecoveryHandler messageRecoveryHandler, TransactionLogRecoveryHandler tlogRecoveryHandler) { _realStore.configureMessageStore(virtualHost, messageRecoveryHandler, tlogRecoveryHandler); } + @Override public void close() { doPreDelay("close"); @@ -155,12 +173,12 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore doPostDelay("close"); } + @Override public <M extends StorableMessageMetaData> StoredMessage<M> addMessage(M metaData) { return _realStore.addMessage(metaData); } - @Override public void create(UUID id, String type, Map<String, Object> attributes) throws StoreException { @@ -210,6 +228,7 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore doPostDelay("update"); } + @Override public Transaction newTransaction() { doPreDelay("beginTran"); @@ -218,27 +237,12 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore return txn; } - + @Override public boolean isPersistent() { return _realStore.isPersistent(); } - public void storeMessageHeader(Long messageNumber, ServerMessage message) - { - //To change body of implemented methods use File | Settings | File Templates. - } - - public void storeContent(Long messageNumber, long offset, ByteBuffer body) - { - //To change body of implemented methods use File | Settings | File Templates. - } - - public ServerMessage getMessage(Long messageNumber) - { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - private class SlowTransaction implements Transaction { private final Transaction _underlying; @@ -248,6 +252,7 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore _underlying = underlying; } + @Override public void enqueueMessage(TransactionLogResource queue, EnqueueableMessage message) { doPreDelay("enqueueMessage"); @@ -255,6 +260,7 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore doPostDelay("enqueueMessage"); } + @Override public void dequeueMessage(TransactionLogResource queue, EnqueueableMessage message) { doPreDelay("dequeueMessage"); @@ -262,6 +268,7 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore doPostDelay("dequeueMessage"); } + @Override public void commitTran() { doPreDelay("commitTran"); @@ -269,6 +276,7 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore doPostDelay("commitTran"); } + @Override public StoreFuture commitTranAsync() { doPreDelay("commitTran"); @@ -277,6 +285,7 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore return future; } + @Override public void abortTran() { doPreDelay("abortTran"); @@ -284,11 +293,13 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore doPostDelay("abortTran"); } + @Override public void removeXid(long format, byte[] globalId, byte[] branchId) { _underlying.removeXid(format, globalId, branchId); } + @Override public void recordXid(long format, byte[] globalId, byte[] branchId, Record[] enqueues, Record[] dequeues) { _underlying.recordXid(format, globalId, branchId, enqueues, dequeues); @@ -304,7 +315,14 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore @Override public void addEventListener(EventListener eventListener, Event... events) { - _realStore.addEventListener(eventListener, events); + if (_realStore == null) + { + _eventListeners .put(eventListener, events); + } + else + { + _realStore.addEventListener(eventListener, events); + } } @Override diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/StoreOverfullTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/StoreOverfullTest.java index eca91c2b5e..3dea6fd5bb 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/StoreOverfullTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/StoreOverfullTest.java @@ -20,6 +20,8 @@ */ package org.apache.qpid.server.store; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; @@ -62,10 +64,13 @@ public class StoreOverfullTest extends QpidBrokerTestCase public void setUp() throws Exception { + Map<String, Object> messageStoreSettings = new HashMap<String, Object>(); + messageStoreSettings.put(MessageStore.STORE_TYPE, QuotaMessageStore.TYPE); + messageStoreSettings.put(MessageStore.OVERFULL_SIZE, OVERFULL_SIZE); + messageStoreSettings.put(MessageStore.UNDERFULL_SIZE, UNDERFULL_SIZE); + TestBrokerConfiguration config = getBrokerConfiguration(); - config.setObjectAttribute(TestBrokerConfiguration.ENTRY_NAME_VIRTUAL_HOST, VirtualHost.STORE_TYPE, QuotaMessageStore.TYPE); - config.setObjectAttribute(TestBrokerConfiguration.ENTRY_NAME_VIRTUAL_HOST, MessageStoreConstants.OVERFULL_SIZE_ATTRIBUTE, OVERFULL_SIZE); - config.setObjectAttribute(TestBrokerConfiguration.ENTRY_NAME_VIRTUAL_HOST, MessageStoreConstants.UNDERFULL_SIZE_ATTRIBUTE, UNDERFULL_SIZE); + config.setObjectAttribute(TestBrokerConfiguration.ENTRY_NAME_VIRTUAL_HOST, VirtualHost.MESSAGE_STORE_SETTINGS, messageStoreSettings); super.setUp(); diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/Asserts.java b/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/Asserts.java index 18774941e8..0d76f6c444 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/Asserts.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/Asserts.java @@ -31,7 +31,18 @@ import java.util.Map; import javax.jms.JMSException; import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.server.model.*; +import org.apache.qpid.server.model.Binding; +import org.apache.qpid.server.model.Broker; +import org.apache.qpid.server.model.ConfiguredObject; +import org.apache.qpid.server.model.Connection; +import org.apache.qpid.server.model.Exchange; +import org.apache.qpid.server.model.ExclusivityPolicy; +import org.apache.qpid.server.model.LifetimePolicy; +import org.apache.qpid.server.model.Port; +import org.apache.qpid.server.model.Protocol; +import org.apache.qpid.server.model.Queue; +import org.apache.qpid.server.model.State; +import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.model.adapter.AbstractConfiguredObject; import org.apache.qpid.test.utils.TestBrokerConfiguration; @@ -50,10 +61,10 @@ public class Asserts ConfiguredObject.LAST_UPDATED_TIME, ConfiguredObject.DESCRIPTION, VirtualHost.SUPPORTED_QUEUE_TYPES, - VirtualHost.STORE_PATH, VirtualHost.TYPE, VirtualHost.CONFIG_STORE_PATH, VirtualHost.CONFIG_STORE_TYPE, + VirtualHost.CONFIGURATION_STORE_SETTINGS, VirtualHost.SECURITY_ACL); assertEquals("Unexpected value of attribute " + VirtualHost.NAME, @@ -146,7 +157,7 @@ public class Asserts @SuppressWarnings("unchecked") Map<String, Object> statistics = (Map<String, Object>) queueData.get(Asserts.STATISTICS_ATTRIBUTE); - Asserts.assertAttributesPresent(statistics, + Asserts.assertAttributesPresent(statistics, "bindingCount", "consumerCount", "consumerCountWithCredit", @@ -226,7 +237,7 @@ public class Asserts @SuppressWarnings("unchecked") Map<String, Object> statistics = (Map<String, Object>) connectionData.get(STATISTICS_ATTRIBUTE); - + assertAttributesPresent(statistics, "bytesIn", diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/RestTestHelper.java b/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/RestTestHelper.java index 49e07e92e8..7050dcfc33 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/RestTestHelper.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/RestTestHelper.java @@ -524,7 +524,7 @@ public class RestTestHelper String queueName = EXPECTED_QUEUES[i]; Map<String, Object> queueData = new HashMap<String, Object>(); queueData.put(Queue.NAME, queueName); - queueData.put(Queue.DURABLE, Boolean.TRUE); + queueData.put(Queue.DURABLE, Boolean.FALSE); submitRequest("/rest/queue/test/" + queueName, "PUT", queueData); Map<String, Object> bindingData = new HashMap<String, Object>(); diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java index 6bc515dcef..1ae1be3101 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java @@ -35,6 +35,7 @@ import org.apache.qpid.server.model.Exchange; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.queue.ConflationQueue; +import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.virtualhost.StandardVirtualHostFactory; import org.apache.qpid.util.FileUtils; import org.codehaus.jackson.JsonGenerationException; @@ -111,7 +112,10 @@ public class VirtualHostRestTest extends QpidRestTestCase restartBroker(); Map<String, Object> hostDetails = getRestTestHelper().getJsonAsSingletonList("/rest/virtualhost/" + hostName); Asserts.assertVirtualHost(hostName, hostDetails); - assertEquals("Unexpected store type", storeType, hostDetails.get(VirtualHost.STORE_TYPE)); + + @SuppressWarnings("unchecked") + Map<String, Object> messageStoreSettings = (Map<String, Object>) hostDetails.get(VirtualHost.MESSAGE_STORE_SETTINGS); + assertEquals("Unexpected store type", storeType, messageStoreSettings.get(MessageStore.STORE_TYPE)); assertNewVirtualHost(hostDetails); } @@ -155,23 +159,30 @@ public class VirtualHostRestTest extends QpidRestTestCase String hostToUpdate = TEST3_VIRTUALHOST; Map<String, Object> hostDetails = getRestTestHelper().getJsonAsSingletonList("/rest/virtualhost/" + hostToUpdate); Asserts.assertVirtualHost(hostToUpdate, hostDetails); - String configPath = (String)hostDetails.get(VirtualHost.STORE_PATH); + @SuppressWarnings("unchecked") + Map<String, Object> attributes = (Map<String, Object>)hostDetails.get(VirtualHost.MESSAGE_STORE_SETTINGS); + String configPath = (String) attributes.get(MessageStore.STORE_PATH); String storeType = getTestProfileMessageStoreType(); String storeLocation = getStoreLocation(hostToUpdate); + Map<String, Object> newMessageStoreSettings = new HashMap<String, Object>(); + newMessageStoreSettings.put(MessageStore.STORE_TYPE, storeType); + newMessageStoreSettings.put(MessageStore.STORE_PATH, storeLocation); + Map<String, Object> newAttributes = new HashMap<String, Object>(); newAttributes.put(VirtualHost.NAME, hostToUpdate); - newAttributes.put(VirtualHost.STORE_TYPE, storeType); - newAttributes.put(VirtualHost.STORE_PATH, storeLocation); + newAttributes.put(VirtualHost.MESSAGE_STORE_SETTINGS, newMessageStoreSettings); int response = getRestTestHelper().submitRequest("/rest/virtualhost/" + hostToUpdate, "PUT", newAttributes); assertEquals("Unexpected response code", 409, response); restartBroker(); - hostDetails = getRestTestHelper().getJsonAsSingletonList("/rest/virtualhost/" + hostToUpdate); - Asserts.assertVirtualHost(hostToUpdate, hostDetails); - assertEquals("Unexpected config path", configPath, hostDetails.get(VirtualHost.STORE_PATH)); + Map<String, Object> rereadHostDetails = getRestTestHelper().getJsonAsSingletonList("/rest/virtualhost/" + hostToUpdate); + Asserts.assertVirtualHost(hostToUpdate, rereadHostDetails); + @SuppressWarnings("unchecked") + Map<String, Object> rereadMessageStoreSettings = (Map<String,Object>)rereadHostDetails.get(VirtualHost.MESSAGE_STORE_SETTINGS); + assertEquals("Unexpected config path", configPath, rereadMessageStoreSettings.get(MessageStore.STORE_PATH)); } public void testPutCreateQueue() throws Exception @@ -524,12 +535,15 @@ public class VirtualHostRestTest extends QpidRestTestCase private int tryCreateVirtualHost(String hostName, String storeType, String storePath, String configPath) throws IOException, JsonGenerationException, JsonMappingException { + Map<String, Object> messageStoreSettings = new HashMap<String, Object>(); + messageStoreSettings.put(MessageStore.STORE_PATH, storePath); + messageStoreSettings.put(MessageStore.STORE_TYPE, storeType); + Map<String, Object> hostData = new HashMap<String, Object>(); hostData.put(VirtualHost.NAME, hostName); hostData.put(VirtualHost.TYPE, StandardVirtualHostFactory.TYPE); - hostData.put(VirtualHost.STORE_PATH, storePath); - hostData.put(VirtualHost.STORE_TYPE, storeType); + hostData.put(VirtualHost.MESSAGE_STORE_SETTINGS, messageStoreSettings); return getRestTestHelper().submitRequest("/rest/virtualhost/" + hostName, "PUT", hostData); } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/acl/BrokerACLTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/acl/BrokerACLTest.java index b2119ff79f..ec389e55f1 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/acl/BrokerACLTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/acl/BrokerACLTest.java @@ -42,6 +42,7 @@ import org.apache.qpid.server.security.acl.AbstractACLTestCase; import org.apache.qpid.server.security.auth.manager.AnonymousAuthenticationManagerFactory; import org.apache.qpid.server.security.auth.manager.PlainPasswordFileAuthenticationManagerFactory; import org.apache.qpid.server.security.group.FileGroupManagerFactory; +import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.virtualhost.StandardVirtualHostFactory; import org.apache.qpid.systest.rest.QpidRestTestCase; import org.apache.qpid.test.utils.TestBrokerConfiguration; @@ -976,10 +977,13 @@ public class BrokerACLTest extends QpidRestTestCase private int createHost(String hostName) throws Exception { + Map<String, Object> messageStoreSettings = new HashMap<String, Object>(); + messageStoreSettings.put(MessageStore.STORE_PATH, getStoreLocation(hostName)); + messageStoreSettings.put(MessageStore.STORE_TYPE, getTestProfileMessageStoreType()); + Map<String, Object> hostData = new HashMap<String, Object>(); hostData.put(VirtualHost.NAME, hostName); - hostData.put(VirtualHost.STORE_PATH, getStoreLocation(hostName)); - hostData.put(VirtualHost.STORE_TYPE, getTestProfileMessageStoreType()); + hostData.put(VirtualHost.MESSAGE_STORE_SETTINGS, messageStoreSettings); hostData.put(VirtualHost.TYPE, StandardVirtualHostFactory.TYPE); return getRestTestHelper().submitRequest("/rest/virtualhost/" + hostName, "PUT", hostData); diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitDelayTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitDelayTest.java index 85e62f8dae..4026b7a6cb 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitDelayTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitDelayTest.java @@ -23,13 +23,6 @@ package org.apache.qpid.test.client.timeouts; import java.util.HashMap; import java.util.Map; -import org.apache.qpid.server.model.VirtualHost; -import org.apache.qpid.server.store.SlowMessageStore; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.qpid.test.utils.QpidBrokerTestCase; -import org.apache.qpid.test.utils.TestBrokerConfiguration; - import javax.jms.Connection; import javax.jms.JMSException; import javax.jms.Message; @@ -38,6 +31,14 @@ import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; +import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.SlowMessageStore; +import org.apache.qpid.test.utils.QpidBrokerTestCase; +import org.apache.qpid.test.utils.TestBrokerConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * This tests that when the commit takes a long time(due to POST_COMMIT_DELAY) that the commit does not timeout * This test must be run in conjunction with SyncWaiteTimeoutDelay or be run with POST_COMMIT_DELAY > 30s to ensure @@ -59,9 +60,14 @@ public class SyncWaitDelayTest extends QpidBrokerTestCase { Map<String, Object> slowMessageStoreDelays = new HashMap<String,Object>(); slowMessageStoreDelays.put("postcommitTran", POST_COMMIT_DELAY); + + Map<String, Object> messageStoreSettings = new HashMap<String, Object>(); + messageStoreSettings.put(MessageStore.STORE_TYPE, SlowMessageStore.TYPE); + messageStoreSettings.put(SlowMessageStore.DELAYS, slowMessageStoreDelays); + TestBrokerConfiguration config = getBrokerConfiguration(); - config.setObjectAttribute(TestBrokerConfiguration.ENTRY_NAME_VIRTUAL_HOST, VirtualHost.STORE_TYPE, SlowMessageStore.TYPE); - config.setObjectAttribute(TestBrokerConfiguration.ENTRY_NAME_VIRTUAL_HOST, "slowMessageStoreDelays", slowMessageStoreDelays); + config.setObjectAttribute(TestBrokerConfiguration.ENTRY_NAME_VIRTUAL_HOST, VirtualHost.MESSAGE_STORE_SETTINGS, messageStoreSettings); + super.setUp(); //Set the syncWrite timeout to be just larger than the delay on the commitTran. diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java index ace34506bd..93cf90829d 100755 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java @@ -64,6 +64,8 @@ import org.apache.qpid.server.configuration.BrokerProperties; import org.apache.qpid.server.model.Port; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.protocol.AmqpProtocolVersion; +import org.apache.qpid.server.store.MemoryMessageStore; +import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.virtualhost.StandardVirtualHostFactory; import org.apache.qpid.url.URLSyntaxException; import org.apache.qpid.util.FileUtils; @@ -233,7 +235,16 @@ public class QpidBrokerTestCase extends QpidTestCase configuration.setObjectAttribute(TestBrokerConfiguration.ENTRY_NAME_AMQP_PORT, Port.PORT, actualPort); configuration.setObjectAttribute(TestBrokerConfiguration.ENTRY_NAME_RMI_PORT, Port.PORT, getManagementPort(actualPort)); configuration.setObjectAttribute(TestBrokerConfiguration.ENTRY_NAME_JMX_PORT, Port.PORT, getManagementPort(actualPort) + JMXPORT_CONNECTORSERVER_OFFSET); + + String workDir = System.getProperty("QPID_WORK") + File.separator + TestBrokerConfiguration.ENTRY_NAME_VIRTUAL_HOST + File.separator + actualPort; + Map<String, Object> virtualHostSettings = configuration.getObjectAttributes(TestBrokerConfiguration.ENTRY_NAME_VIRTUAL_HOST); + + @SuppressWarnings("unchecked") + Map<String, Object> storeSettings = (Map<String, Object>)virtualHostSettings.get(VirtualHost.MESSAGE_STORE_SETTINGS); + storeSettings.put(MessageStore.STORE_PATH, workDir); + configuration.setObjectAttribute(TestBrokerConfiguration.ENTRY_NAME_VIRTUAL_HOST, VirtualHost.MESSAGE_STORE_SETTINGS, storeSettings); } + return configuration; } @@ -433,10 +444,10 @@ public class QpidBrokerTestCase extends QpidTestCase } Set<Integer> portsUsedByBroker = guessAllPortsUsedByBroker(port); + String testConfig = saveTestConfiguration(port, testConfiguration); if (_brokerType.equals(BrokerType.INTERNAL) && !existingInternalBroker()) { - String testConfig = saveTestConfiguration(port, testConfiguration); setSystemProperty(BrokerProperties.PROPERTY_USE_CUSTOM_RMI_SOCKET_FACTORY, "false"); BrokerOptions options = new BrokerOptions(); @@ -460,9 +471,6 @@ public class QpidBrokerTestCase extends QpidTestCase } else if (!_brokerType.equals(BrokerType.EXTERNAL)) { - String workDir = System.getProperty("QPID_WORK") + File.separator + "work" + File.separator + port; - testConfiguration.setObjectAttribute(TestBrokerConfiguration.ENTRY_NAME_VIRTUAL_HOST, VirtualHost.STORE_PATH, workDir); - String testConfig = saveTestConfiguration(port, testConfiguration); // Add the port to QPID_WORK to ensure unique working dirs for multi broker tests final String qpidWork = getQpidWork(_brokerType, port); @@ -834,7 +842,7 @@ public class QpidBrokerTestCase extends QpidTestCase { storeDir = ":memory:"; } - else if (!"Memory".equals(storeType)) + else if (!MemoryMessageStore.TYPE.equals(storeType)) { storeDir = "${QPID_WORK}" + File.separator + virtualHostName + File.separator + brokerPort; } @@ -843,8 +851,10 @@ public class QpidBrokerTestCase extends QpidTestCase Map<String, Object> attributes = new HashMap<String, Object>(); attributes.put(VirtualHost.NAME, virtualHostName); attributes.put(VirtualHost.TYPE, StandardVirtualHostFactory.TYPE); - attributes.put(VirtualHost.STORE_TYPE, storeType); - attributes.put(VirtualHost.STORE_PATH, storeDir); + Map<String, Object> messageStoreSettings = new HashMap<String, Object>(); + messageStoreSettings.put(MessageStore.STORE_TYPE, storeType); + messageStoreSettings.put(MessageStore.STORE_PATH, storeDir); + attributes.put(VirtualHost.MESSAGE_STORE_SETTINGS, messageStoreSettings ); int port = getPort(brokerPort); getBrokerConfiguration(port).addVirtualHostConfiguration(attributes); } |