summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2014-03-25 10:07:21 +0000
committerKeith Wall <kwall@apache.org>2014-03-25 10:07:21 +0000
commitfcc3f654b60b7dd2180afe73e8809545725b41af (patch)
treeef5f631412fdbfd5a6ff4232782e90692e3767c1
parentb0e9d446fd5edc23267e2aa924a703749bdb95df (diff)
downloadqpid-python-fcc3f654b60b7dd2180afe73e8809545725b41af.tar.gz
Introduction of separate lifecycle methods on stores for open/close/recover.
Change Upgraders responsibility to create the amqp standard exchanges when upgrading from earlier store versions. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-bdb-ha2@1581288 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java32
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java213
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeFactory.java4
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java33
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java4
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreConfigurationTest.java42
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreQuotaEventsTest.java11
-rw-r--r--qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java12
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java204
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractMemoryMessageStore.java37
-rwxr-xr-xqpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java23
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java23
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java29
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java21
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java12
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java89
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java14
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java146
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreConfigurationTest.java36
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreTest.java102
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java14
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java136
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java85
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/StandardVirtualHostTest.java33
-rw-r--r--qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java20
-rw-r--r--qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreConfigurationTest.java40
-rw-r--r--qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreQuotaEventsTest.java13
-rw-r--r--qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreTest.java10
-rw-r--r--qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java27
-rw-r--r--qpid/java/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java13
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java22
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java125
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/store/SplitStoreTest.java130
-rwxr-xr-xqpid/java/test-profiles/CPPExcludes1
-rw-r--r--qpid/java/test-profiles/JavaTransientExcludes1
36 files changed, 968 insertions, 791 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java
index e41b81f846..7e42d09ba6 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java
@@ -75,18 +75,12 @@ public class BDBHAVirtualHost extends AbstractVirtualHost
_messageStore.addEventListener(new AfterInitialisationListener(), Event.AFTER_INIT);
_messageStore.addEventListener(new BeforePassivationListener(), Event.BEFORE_PASSIVATE);
- VirtualHostConfigRecoveryHandler recoveryHandler = new VirtualHostConfigRecoveryHandler(this);
- DurableConfigurationRecoverer configRecoverer =
- new DurableConfigurationRecoverer(getName(), getDurableConfigurationRecoverers(),
- new DefaultUpgraderProvider(this, getExchangeRegistry()), getEventLogger());
-
- _messageStore.configureConfigStore(
- virtualHost, configRecoverer
+ _messageStore.openConfigurationStore(
+ virtualHost.getName(), virtualHost.getMessageStoreSettings()
);
- _messageStore.configureMessageStore(
- virtualHost, recoveryHandler,
- recoveryHandler
+ _messageStore.openMessageStore(
+ virtualHost.getName(), virtualHost.getMessageStoreSettings()
);
// Make the virtualhost model object a replication group listener
@@ -105,7 +99,8 @@ public class BDBHAVirtualHost extends AbstractVirtualHost
try
{
_inVhostInitiatedClose = true;
- getMessageStore().close();
+ _messageStore.closeMessageStore();
+ _messageStore.closeConfigurationStore();
}
catch (Exception e)
{
@@ -214,7 +209,7 @@ public class BDBHAVirtualHost extends AbstractVirtualHost
if (LOGGER.isInfoEnabled())
{
LOGGER.info("Received BDB event indicating transition to state " + state
- + " when current message store state is " + _messageStore._stateManager.getState());
+ + " when current message store state is " + _messageStore._messageStoreStateManager.getState());
}
switch (state)
@@ -243,7 +238,14 @@ public class BDBHAVirtualHost extends AbstractVirtualHost
try
{
_messageStore.getEnvironmentFacade().getEnvironment().flushLog(true);
- _messageStore.activate();
+
+ DurableConfigurationRecoverer configRecoverer =
+ new DurableConfigurationRecoverer(getName(), getDurableConfigurationRecoverers(),
+ new DefaultUpgraderProvider(BDBHAVirtualHost.this, getExchangeRegistry()), getEventLogger());
+ _messageStore.recoverConfigurationStore(configRecoverer);
+
+ VirtualHostConfigRecoveryHandler recoveryHandler = new VirtualHostConfigRecoveryHandler(BDBHAVirtualHost.this);
+ _messageStore.recoverMessageStore(recoveryHandler, recoveryHandler);
}
catch (Exception e)
{
@@ -256,9 +258,9 @@ public class BDBHAVirtualHost extends AbstractVirtualHost
try
{
//TODO: move this this into the store method passivate()
- if (_messageStore._stateManager.isNotInState(org.apache.qpid.server.store.State.INITIALISED))
+ if (_messageStore._messageStoreStateManager.isNotInState(org.apache.qpid.server.store.State.INITIALISED))
{
- _messageStore._stateManager.attainState(org.apache.qpid.server.store.State.INITIALISED);
+ _messageStore._messageStoreStateManager.attainState(org.apache.qpid.server.store.State.INITIALISED);
}
}
catch (Exception e)
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
index 8be037eb7b..2022f36bd9 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
@@ -30,12 +30,10 @@ import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.log4j.Logger;
import org.apache.qpid.server.message.EnqueueableMessage;
-import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
import org.apache.qpid.server.store.ConfiguredObjectRecord;
@@ -102,25 +100,20 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
private static String MESSAGE_META_DATA_DB_NAME = "MESSAGE_METADATA";
private static String MESSAGE_CONTENT_DB_NAME = "MESSAGE_CONTENT";
private static String DELIVERY_DB_NAME = "QUEUE_ENTRIES";
+
+ //TODO: Add upgrader to remove BRIDGES and LINKS
private static String BRIDGEDB_NAME = "BRIDGES";
private static String LINKDB_NAME = "LINKS";
private static String XID_DB_NAME = "XIDS";
private static String CONFIG_VERSION_DB_NAME = "CONFIG_VERSION";
- private static final String[] DATABASE_NAMES = new String[] { CONFIGURED_OBJECTS_DB_NAME, MESSAGE_META_DATA_DB_NAME,
- MESSAGE_CONTENT_DB_NAME, DELIVERY_DB_NAME, BRIDGEDB_NAME, LINKDB_NAME, XID_DB_NAME, CONFIG_VERSION_DB_NAME };
-
- private final AtomicBoolean _closed = new AtomicBoolean(false);
+ private static final String[] CONFIGURATION_STORE_DATABASE_NAMES = new String[] { CONFIGURED_OBJECTS_DB_NAME, CONFIG_VERSION_DB_NAME };
+ private static final String[] MESSAGE_STORE_DATABASE_NAMES = new String[] { MESSAGE_META_DATA_DB_NAME, MESSAGE_CONTENT_DB_NAME, DELIVERY_DB_NAME, BRIDGEDB_NAME, LINKDB_NAME, XID_DB_NAME };
private EnvironmentFacade _environmentFacade;
private final AtomicLong _messageId = new AtomicLong(0);
- protected final StateManager _stateManager;
-
- private MessageStoreRecoveryHandler _messageRecoveryHandler;
-
- private TransactionLogRecoveryHandler _tlogRecoveryHandler;
-
- private ConfigurationRecoveryHandler _configRecoveryHandler;
+ protected final StateManager _messageStoreStateManager;
+ private final StateManager _configurationStoreStateManager;
private long _totalStoreSize;
private boolean _limitBusted;
@@ -129,12 +122,14 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
private final EventManager _eventManager = new EventManager();
private final String _type;
- private VirtualHost _virtualHost;
private final EnvironmentFacadeFactory _environmentFacadeFactory;
private volatile Committer _committer;
+ private String _virtualHostName;
+
+
public BDBMessageStore()
{
this(new StandardEnvironmentFacadeFactory());
@@ -144,7 +139,8 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
{
_type = environmentFacadeFactory.getType();
_environmentFacadeFactory = environmentFacadeFactory;
- _stateManager = new StateManager(_eventManager);
+ _messageStoreStateManager = new StateManager(_eventManager);
+ _configurationStoreStateManager = new StateManager(new EventManager());
}
@Override
@@ -154,94 +150,47 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
}
@Override
- public void configureConfigStore(VirtualHost virtualHost, ConfigurationRecoveryHandler recoveryHandler)
+ public void openConfigurationStore(String virtualHostName, Map<String, Object> storeSettings)
{
- _stateManager.attainState(State.INITIALISING);
+ _configurationStoreStateManager.attainState(State.INITIALISING);
- _configRecoveryHandler = recoveryHandler;
- _virtualHost = virtualHost;
- }
-
- @Override
- public void configureMessageStore(VirtualHost virtualHost, MessageStoreRecoveryHandler messageRecoveryHandler,
- TransactionLogRecoveryHandler tlogRecoveryHandler) throws StoreException
- {
- if(_stateManager.isInState(State.INITIAL))
+ _virtualHostName = virtualHostName;
+ if (_environmentFacade == null)
{
- // Is acting as a message store, but not a durable config store
- _stateManager.attainState(State.INITIALISING);
+ _environmentFacade = _environmentFacadeFactory.createEnvironmentFacade(_virtualHostName, storeSettings);
}
-
- _messageRecoveryHandler = messageRecoveryHandler;
- _tlogRecoveryHandler = tlogRecoveryHandler;
- _virtualHost = virtualHost;
-
-
- completeInitialisation();
+ _configurationStoreStateManager.attainState(State.INITIALISED);
}
- private void completeInitialisation() throws StoreException
+ @Override
+ public void recoverConfigurationStore(ConfigurationRecoveryHandler recoveryHandler)
{
- configure(_virtualHost, _messageRecoveryHandler != null);
+ _configurationStoreStateManager.attainState(State.ACTIVATING);
- _stateManager.attainState(State.INITIALISED);
- }
-
- private void startActivation() throws StoreException
- {
DatabaseConfig dbConfig = new DatabaseConfig();
dbConfig.setTransactional(true);
dbConfig.setAllowCreate(true);
try
{
- new Upgrader(_environmentFacade.getEnvironment(), _virtualHost.getName()).upgradeIfNecessary();
- _environmentFacade.openDatabases(dbConfig, DATABASE_NAMES);
- _totalStoreSize = getSizeOnDisk();
+ new Upgrader(_environmentFacade.getEnvironment(), _virtualHostName).upgradeIfNecessary();
+ _environmentFacade.openDatabases(dbConfig, CONFIGURATION_STORE_DATABASE_NAMES);
}
catch(DatabaseException e)
{
throw _environmentFacade.handleDatabaseException("Cannot configure store", e);
}
+ recoverConfig(recoveryHandler);
+ _configurationStoreStateManager.attainState(State.ACTIVE);
}
@Override
- public synchronized void activate() throws StoreException
+ public void openMessageStore(String virtualHostName, Map<String, Object> messageStoreSettings) throws StoreException
{
- // check if acting as a durable config store, but not a message store
- if(_stateManager.isInState(State.INITIALISING))
- {
- completeInitialisation();
- }
-
- _stateManager.attainState(State.ACTIVATING);
- startActivation();
-
- if(_configRecoveryHandler != null)
- {
- recoverConfig(_configRecoveryHandler);
- }
- if(_messageRecoveryHandler != null)
- {
- recoverMessages(_messageRecoveryHandler);
- }
- if(_tlogRecoveryHandler != null)
- {
- recoverQueueEntries(_tlogRecoveryHandler);
- }
+ _messageStoreStateManager.attainState(State.INITIALISING);
- _stateManager.attainState(State.ACTIVE);
- }
+ _virtualHostName = virtualHostName;
- @Override
- public org.apache.qpid.server.store.Transaction newTransaction() throws StoreException
- {
- return new BDBTransaction();
- }
-
- private void configure(VirtualHost virtualHost, boolean isMessageStore) throws StoreException
- {
- Map<String, Object> messageStoreSettings = virtualHost.getMessageStoreSettings();
Object overfullAttr = messageStoreSettings.get(MessageStore.OVERFULL_SIZE);
Object underfullAttr = messageStoreSettings.get(MessageStore.UNDERFULL_SIZE);
@@ -250,16 +199,56 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
_persistentSizeLowThreshold = underfullAttr == null ? _persistentSizeHighThreshold :
underfullAttr instanceof Number ? ((Number) underfullAttr).longValue() : Long.parseLong(underfullAttr.toString());
-
if(_persistentSizeLowThreshold > _persistentSizeHighThreshold || _persistentSizeLowThreshold < 0l)
{
_persistentSizeLowThreshold = _persistentSizeHighThreshold;
}
- _environmentFacade = _environmentFacadeFactory.createEnvironmentFacade(virtualHost, isMessageStore);
+ if (_environmentFacade == null)
+ {
+ _environmentFacade = _environmentFacadeFactory.createEnvironmentFacade(_virtualHostName, messageStoreSettings);
+ }
- _committer = _environmentFacade.createCommitter(virtualHost.getName());
+ _committer = _environmentFacade.createCommitter(_virtualHostName);
_committer.start();
+
+ _messageStoreStateManager.attainState(State.INITIALISED);
+ }
+
+ @Override
+ public synchronized void recoverMessageStore(MessageStoreRecoveryHandler messageRecoveryHandler, TransactionLogRecoveryHandler transactionLogRecoveryHandler) throws StoreException
+ {
+ _messageStoreStateManager.attainState(State.ACTIVATING);
+ DatabaseConfig dbConfig = new DatabaseConfig();
+ dbConfig.setTransactional(true);
+ dbConfig.setAllowCreate(true);
+ try
+ {
+ new Upgrader(_environmentFacade.getEnvironment(), _virtualHostName).upgradeIfNecessary();
+ _environmentFacade.openDatabases(dbConfig, MESSAGE_STORE_DATABASE_NAMES);
+ _totalStoreSize = getSizeOnDisk();
+ }
+ catch(DatabaseException e)
+ {
+ throw _environmentFacade.handleDatabaseException("Cannot activate message store", e);
+ }
+
+ if(messageRecoveryHandler != null)
+ {
+ recoverMessages(messageRecoveryHandler);
+ }
+ if(transactionLogRecoveryHandler != null)
+ {
+ recoverQueueEntries(transactionLogRecoveryHandler);
+ }
+
+ _messageStoreStateManager.attainState(State.ACTIVE);
+ }
+
+ @Override
+ public org.apache.qpid.server.store.Transaction newTransaction() throws StoreException
+ {
+ return new BDBTransaction();
}
@Override
@@ -283,35 +272,59 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
* @throws Exception If the close fails.
*/
@Override
- public void close() throws StoreException
+ public void closeMessageStore() throws StoreException
{
- if (_closed.compareAndSet(false, true))
+ _messageStoreStateManager.attainState(State.CLOSING);
+ try
{
- _stateManager.attainState(State.CLOSING);
- try
+ if (_committer != null)
{
- try
- {
- _committer.stop();
- }
- finally
- {
- closeEnvironment();
- }
+ _committer.stop();
}
- catch(DatabaseException e)
+ }
+ finally
+ {
+ if (_configurationStoreStateManager.isInState(State.CLOSED) || _configurationStoreStateManager.isInState(State.INITIAL))
{
- throw new StoreException("Exception occured on message store close", e);
+ closeEnvironment();
}
- _stateManager.attainState(State.CLOSED);
}
+ _messageStoreStateManager.attainState(State.CLOSED);
+ }
+
+ @Override
+ public void closeConfigurationStore() throws StoreException
+ {
+ _configurationStoreStateManager.attainState(State.CLOSING);
+ try
+ {
+ if (_committer != null)
+ {
+ _committer.stop();
+ }
+ }
+ finally
+ {
+ if (_messageStoreStateManager.isInState(State.CLOSED) || _messageStoreStateManager.isInState(State.INITIAL))
+ {
+ closeEnvironment();
+ }
+ }
+ _configurationStoreStateManager.attainState(State.CLOSED);
}
private void closeEnvironment()
{
if (_environmentFacade != null)
{
- _environmentFacade.close();
+ try
+ {
+ _environmentFacade.close();
+ }
+ catch(DatabaseException e)
+ {
+ throw new StoreException("Exception occured on message store close", e);
+ }
}
}
@@ -704,7 +717,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
@Override
public void create(UUID id, String type, Map<String, Object> attributes) throws StoreException
{
- if (_stateManager.isInState(State.ACTIVE))
+ if (_configurationStoreStateManager.isInState(State.ACTIVE))
{
ConfiguredObjectRecord configuredObject = new ConfiguredObjectRecord(id, type, attributes);
storeConfiguredObjectEntry(configuredObject);
@@ -774,7 +787,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
{
if (LOGGER.isDebugEnabled())
{
- LOGGER.debug("Updating " +type + ", id: " + id);
+ LOGGER.debug("Updating " + type + ", id: " + id);
}
try
@@ -1286,7 +1299,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
*/
private void storeConfiguredObjectEntry(ConfiguredObjectRecord configuredObject) throws StoreException
{
- if (_stateManager.isInState(State.ACTIVE))
+ if (_configurationStoreStateManager.isInState(State.ACTIVE))
{
LOGGER.debug("Storing configured object: " + configuredObject);
DatabaseEntry key = new DatabaseEntry();
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeFactory.java
index d242790efb..fd064d9b0e 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeFactory.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeFactory.java
@@ -20,13 +20,13 @@
*/
package org.apache.qpid.server.store.berkeleydb;
-import org.apache.qpid.server.model.VirtualHost;
+import java.util.Map;
public interface EnvironmentFacadeFactory
{
public static final String ENVIRONMENT_CONFIGURATION = "bdbEnvironmentConfig";
- EnvironmentFacade createEnvironmentFacade(VirtualHost<?> virtualHost, boolean isMessageStore);
+ EnvironmentFacade createEnvironmentFacade(String virtualHostName, Map<String, Object> storeSettings);
String getType();
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java
index 75e14a70c7..cc38b799a6 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java
@@ -25,8 +25,6 @@ import java.util.HashMap;
import java.util.Map;
import org.apache.qpid.server.configuration.BrokerProperties;
-import org.apache.qpid.server.model.VirtualHost;
-import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.MessageStore;
public class StandardEnvironmentFacadeFactory implements EnvironmentFacadeFactory
@@ -34,38 +32,19 @@ public class StandardEnvironmentFacadeFactory implements EnvironmentFacadeFactor
@SuppressWarnings("unchecked")
@Override
- public EnvironmentFacade createEnvironmentFacade(VirtualHost<?> virtualHost, boolean isMessageStore)
+ public EnvironmentFacade createEnvironmentFacade(String virtualHostName, Map<String, Object> messageStoreSettings)
{
- String name = virtualHost.getName();
- Map<String, Object> messageStoreSettings = virtualHost.getMessageStoreSettings();
Map<String, String> envConfigMap = new HashMap<String, String>();
envConfigMap.putAll(EnvironmentFacade.ENVCONFIG_DEFAULTS);
- final String defaultPath = System.getProperty(BrokerProperties.PROPERTY_QPID_WORK) + File.separator + "bdbstore" + File.separator + name;
+ final String defaultPath = System.getProperty(BrokerProperties.PROPERTY_QPID_WORK) + File.separator + "bdbstore" + File.separator + virtualHostName;
- String storeLocation;
- if(isMessageStore)
+ Object environmentConfigurationAttributes = messageStoreSettings.get(ENVIRONMENT_CONFIGURATION);
+ if (environmentConfigurationAttributes instanceof Map)
{
- Object environmentConfigurationAttributes = messageStoreSettings.get(ENVIRONMENT_CONFIGURATION);
- if (environmentConfigurationAttributes instanceof Map)
- {
- envConfigMap.putAll((Map<String, String>) environmentConfigurationAttributes);
- }
-
- storeLocation = (String) messageStoreSettings.get(MessageStore.STORE_PATH);
- }
- else // we are acting only as the durable config store
- {
- Map<String, Object> configurationStoreSettings = virtualHost.getConfigurationStoreSettings();
-
- Object environmentConfigurationAttributes = configurationStoreSettings.get(ENVIRONMENT_CONFIGURATION);
- if (environmentConfigurationAttributes instanceof Map)
- {
- envConfigMap.putAll((Map<String, String>) environmentConfigurationAttributes);
- }
-
- storeLocation = (String) configurationStoreSettings.get(DurableConfigurationStore.STORE_PATH);
+ envConfigMap.putAll((Map<String, String>) environmentConfigurationAttributes);
}
+ String storeLocation = (String) messageStoreSettings.get(MessageStore.STORE_PATH);
if(storeLocation == null)
{
storeLocation = defaultPath;
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java
index 4df62b1d0f..c6b3e48cf8 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java
@@ -22,7 +22,6 @@ package org.apache.qpid.server.store.berkeleydb.replication;
import java.util.Map;
-import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.berkeleydb.EnvironmentFacade;
import org.apache.qpid.server.store.berkeleydb.EnvironmentFacadeFactory;
@@ -48,9 +47,8 @@ public class ReplicatedEnvironmentFacadeFactory implements EnvironmentFacadeFact
private static final boolean DEFAULT_COALESCING_SYNC = true;
@Override
- public EnvironmentFacade createEnvironmentFacade(VirtualHost<?> virtualHost, boolean isMessageStore)
+ public EnvironmentFacade createEnvironmentFacade(String virtualHostName, final Map<String, Object> messageStoreSettings)
{
- final Map<String, Object> messageStoreSettings = virtualHost.getMessageStoreSettings();
ReplicatedEnvironmentConfiguration configuration = new ReplicatedEnvironmentConfiguration()
{
@Override
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreConfigurationTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreConfigurationTest.java
index bfe41773eb..e1678e6f65 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreConfigurationTest.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreConfigurationTest.java
@@ -25,49 +25,9 @@ import org.apache.qpid.server.store.DurableConfigurationStore;
public class BDBMessageStoreConfigurationTest extends AbstractDurableConfigurationStoreTestCase
{
- private BDBMessageStore _bdbMessageStore;
-
- @Override
- protected BDBMessageStore createMessageStore() throws Exception
- {
- createStoreIfNecessary();
- return _bdbMessageStore;
- }
-
- @Override
- protected void closeMessageStore() throws Exception
- {
- closeStoreIfNecessary();
- }
-
@Override
protected DurableConfigurationStore createConfigStore() throws Exception
{
- createStoreIfNecessary();
-
- return _bdbMessageStore;
- }
-
- @Override
- protected void closeConfigStore() throws Exception
- {
- closeStoreIfNecessary();
- }
-
- private void createStoreIfNecessary()
- {
- if(_bdbMessageStore == null)
- {
- _bdbMessageStore = new BDBMessageStore();
- }
- }
-
- private void closeStoreIfNecessary() throws Exception
- {
- if (_bdbMessageStore != null)
- {
- _bdbMessageStore.close();
- _bdbMessageStore = null;
- }
+ return new BDBMessageStore();
}
}
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreQuotaEventsTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreQuotaEventsTest.java
index 65830fd1c2..f2de01445d 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreQuotaEventsTest.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreQuotaEventsTest.java
@@ -20,15 +20,11 @@
*/
package org.apache.qpid.server.store.berkeleydb;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.log4j.Logger;
-import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.MessageStoreQuotaEventsTestBase;
@@ -62,20 +58,17 @@ public class BDBMessageStoreQuotaEventsTest extends MessageStoreQuotaEventsTestB
@Override
- protected VirtualHost<?> createVirtualHost(String storeLocation)
+ protected Map<String, Object>createStoreSettings(String storeLocation)
{
_logger.debug("Applying store specific config. overfull-size=" + OVERFULL_SIZE + ", underfull-size=" + UNDERFULL_SIZE);
- VirtualHost<?> vhost = mock(VirtualHost.class);
Map<String, Object> messageStoreSettings = new HashMap<String, Object>();
messageStoreSettings.put(MessageStore.STORE_PATH, storeLocation);
messageStoreSettings.put(MessageStore.OVERFULL_SIZE, OVERFULL_SIZE);
messageStoreSettings.put(MessageStore.UNDERFULL_SIZE, UNDERFULL_SIZE);
Map<String,String> envMap = Collections.singletonMap("je.log.fileMax", MAX_BDB_LOG_SIZE);
messageStoreSettings.put(EnvironmentFacadeFactory.ENVIRONMENT_CONFIGURATION, envMap);
- when(vhost.getMessageStoreSettings()).thenReturn(messageStoreSettings);
- when(vhost.getName()).thenReturn("test");
- return vhost;
+ return messageStoreSettings;
}
@Override
diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
index 835deb4a4c..c8fcfe0826 100644
--- a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
+++ b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
@@ -40,6 +40,7 @@ import org.apache.qpid.server.message.EnqueueableMessage;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.UUIDGenerator;
+import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.protocol.v0_10.MessageMetaDataType_0_10;
import org.apache.qpid.server.protocol.v0_10.MessageMetaData_0_10;
import org.apache.qpid.server.protocol.v0_8.MessageMetaData;
@@ -198,7 +199,7 @@ public class BDBMessageStoreTest extends MessageStoreTest
String returnedPayloadString_0_10 = new String(recoveredContent.array());
assertEquals("Message Payload has changed", bodyText, returnedPayloadString_0_10);
- readOnlyStore.close();
+ readOnlyStore.closeMessageStore();
}
private DeliveryProperties createDeliveryProperties_0_10()
@@ -233,15 +234,16 @@ public class BDBMessageStoreTest extends MessageStoreTest
*/
private BDBMessageStore reloadStore(BDBMessageStore messageStore) throws Exception
{
- messageStore.close();
+ messageStore.closeMessageStore();
BDBMessageStore newStore = new BDBMessageStore();
MessageStoreRecoveryHandler recoveryHandler = mock(MessageStoreRecoveryHandler.class);
when(recoveryHandler.begin()).thenReturn(mock(StoredMessageRecoveryHandler.class));
- newStore.configureMessageStore(getVirtualHostModel(), recoveryHandler, null);
+ VirtualHost<?> virtualHost = getVirtualHostModel();
+ newStore.openMessageStore(virtualHost.getName(), virtualHost.getMessageStoreSettings());
- newStore.activate();
+ newStore.recoverMessageStore(recoveryHandler, null);
return newStore;
}
@@ -521,7 +523,7 @@ public class BDBMessageStoreTest extends MessageStoreTest
File location = new File(storeLocation);
assertTrue("Store does not exist at " + storeLocation, location.exists());
- bdbStore.close();
+ bdbStore.closeMessageStore();
assertTrue("Store does not exist at " + storeLocation, location.exists());
bdbStore.onDelete();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
index f4072277d5..6e8932d3ba 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
@@ -57,7 +57,7 @@ public interface VirtualHost<X extends VirtualHost<X>> extends ConfiguredObject<
String CONFIGURATION_STORE_SETTINGS = "configurationStoreSettings";
String MESSAGE_STORE_SETTINGS = "messageStoreSettings";
- int CURRENT_CONFIG_VERSION = 4;
+ int CURRENT_CONFIG_VERSION = 5;
@ManagedAttribute
Collection<String> getSupportedExchangeTypes();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
index 81c74fe8f5..a6424a3d28 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
@@ -37,15 +37,16 @@ import java.sql.Types;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.UUID;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+
import org.apache.log4j.Logger;
import org.apache.qpid.server.message.EnqueueableMessage;
import org.apache.qpid.server.model.ConfiguredObject;
-import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.plugin.MessageMetaDataType;
import org.apache.qpid.server.queue.AMQQueue;
import org.codehaus.jackson.JsonGenerationException;
@@ -79,13 +80,13 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
private static final String CONFIGURED_OBJECTS_TABLE_NAME = "QPID_CONFIGURED_OBJECTS";
private static final int DEFAULT_CONFIG_VERSION = 0;
- public static String[] ALL_TABLES = new String[] { DB_VERSION_TABLE_NAME, LINKS_TABLE_NAME, BRIDGES_TABLE_NAME, XID_ACTIONS_TABLE_NAME,
- XID_TABLE_NAME, QUEUE_ENTRY_TABLE_NAME, MESSAGE_CONTENT_TABLE_NAME, META_DATA_TABLE_NAME, CONFIGURED_OBJECTS_TABLE_NAME, CONFIGURATION_VERSION_TABLE_NAME };
+ public static final Set<String> CONFIGURATION_STORE_TABLE_NAMES = new HashSet<String>(Arrays.asList(CONFIGURED_OBJECTS_TABLE_NAME, CONFIGURATION_VERSION_TABLE_NAME));
+ public static final Set<String> MESSAGE_STORE_TABLE_NAMES = new HashSet<String>(Arrays.asList(DB_VERSION_TABLE_NAME, META_DATA_TABLE_NAME, MESSAGE_CONTENT_TABLE_NAME, QUEUE_ENTRY_TABLE_NAME, BRIDGES_TABLE_NAME, LINKS_TABLE_NAME, XID_TABLE_NAME, XID_ACTIONS_TABLE_NAME));
+
private static final int DB_VERSION = 7;
private final AtomicLong _messageId = new AtomicLong(0);
- private final AtomicBoolean _closed = new AtomicBoolean(false);
private static final String CREATE_DB_VERSION_TABLE = "CREATE TABLE "+ DB_VERSION_TABLE_NAME + " ( version int not null )";
private static final String INSERT_INTO_DB_VERSION = "INSERT INTO "+ DB_VERSION_TABLE_NAME + " ( version ) VALUES ( ? )";
@@ -192,69 +193,90 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
protected final EventManager _eventManager = new EventManager();
- protected final StateManager _stateManager;
+ protected final StateManager _messageStoreStateManager;
- private MessageStoreRecoveryHandler _messageRecoveryHandler;
- private TransactionLogRecoveryHandler _tlogRecoveryHandler;
- private ConfigurationRecoveryHandler _configRecoveryHandler;
- private VirtualHost _virtualHost;
+ private StateManager _configurationStoreStateManager;
+ private boolean _initialized;
public AbstractJDBCMessageStore()
{
- _stateManager = new StateManager(_eventManager);
+ _messageStoreStateManager = new StateManager(_eventManager);
+ _configurationStoreStateManager = new StateManager(new EventManager());
}
@Override
- public void configureConfigStore(VirtualHost virtualHost, ConfigurationRecoveryHandler configRecoveryHandler)
+ public void openConfigurationStore(String virtualHostName, Map<String, Object> storeSettings)
{
- _stateManager.attainState(State.INITIALISING);
- _configRecoveryHandler = configRecoveryHandler;
- _virtualHost = virtualHost;
-
+ _configurationStoreStateManager.attainState(State.INITIALISING);
+ initialiseIfNecessary(virtualHostName, storeSettings);
+ _configurationStoreStateManager.attainState(State.INITIALISED);
}
- @Override
- public void configureMessageStore(VirtualHost virtualHost, MessageStoreRecoveryHandler recoveryHandler,
- TransactionLogRecoveryHandler tlogRecoveryHandler)
+ private void initialiseIfNecessary(String virtualHostName, Map<String, Object> storeSettings)
{
- if(_stateManager.isInState(State.INITIAL))
+ if (!_initialized)
{
- _stateManager.attainState(State.INITIALISING);
+ try
+ {
+ implementationSpecificConfiguration(virtualHostName, storeSettings);
+ }
+ catch (ClassNotFoundException e)
+ {
+ throw new StoreException("Cannot find driver class", e);
+ }
+ catch (SQLException e)
+ {
+ throw new StoreException("Unexpected exception occured", e);
+ }
+ _initialized =true;
}
+ }
- _virtualHost = virtualHost;
- _tlogRecoveryHandler = tlogRecoveryHandler;
- _messageRecoveryHandler = recoveryHandler;
+ @Override
+ public void recoverConfigurationStore(ConfigurationRecoveryHandler recoveryHandler)
+ {
+ _configurationStoreStateManager.attainState(State.ACTIVATING);
- completeInitialisation();
+ try
+ {
+ createOrOpenConfigurationStoreDatabase();
+ recoveryHandler.beginConfigurationRecovery(this, getConfigVersion());
+ loadConfiguredObjects(recoveryHandler);
+ setConfigVersion(recoveryHandler.completeConfigurationRecovery());
+ }
+ catch (SQLException e)
+ {
+ throw new StoreException("Error recovering persistent state: " + e.getMessage(), e);
+ }
+ _configurationStoreStateManager.attainState(State.ACTIVE);
}
- private void completeInitialisation()
+ @Override
+ public void openMessageStore(String virtualHostName, Map<String, Object> messageStoreSettings)
{
- commonConfiguration();
-
- _stateManager.attainState(State.INITIALISED);
+ _messageStoreStateManager.attainState(State.INITIALISING);
+ initialiseIfNecessary(virtualHostName, messageStoreSettings);
+ _messageStoreStateManager.attainState(State.INITIALISED);
}
@Override
- public void activate()
+ public void recoverMessageStore(MessageStoreRecoveryHandler messageRecoveryHandler, TransactionLogRecoveryHandler transactionLogRecoveryHandler)
{
- if(_stateManager.isInState(State.INITIALISING))
+ _messageStoreStateManager.attainState(State.ACTIVATING);
+ try
{
- completeInitialisation();
+ createOrOpenMessageStoreDatabase();
+ upgradeIfNecessary();
}
- _stateManager.attainState(State.ACTIVATING);
-
- // this recovers durable exchanges, queues, and bindings
- if(_configRecoveryHandler != null)
+ catch (SQLException e)
{
- recoverConfiguration(_configRecoveryHandler);
+ throw new StoreException("Unable to activate message store ", e);
}
- if(_messageRecoveryHandler != null)
+ if(messageRecoveryHandler != null)
{
try
{
- recoverMessages(_messageRecoveryHandler);
+ recoverMessages(messageRecoveryHandler);
}
catch (SQLException e)
{
@@ -262,11 +284,11 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
"persistent store ", e);
}
}
- if(_tlogRecoveryHandler != null)
+ if(transactionLogRecoveryHandler != null)
{
try
{
- TransactionLogRecoveryHandler.DtxRecordRecoveryHandler dtxrh = recoverQueueEntries(_tlogRecoveryHandler);
+ TransactionLogRecoveryHandler.DtxRecordRecoveryHandler dtxrh = recoverQueueEntries(transactionLogRecoveryHandler);
recoverXids(dtxrh);
}
catch (SQLException e)
@@ -277,25 +299,7 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
}
- _stateManager.attainState(State.ACTIVE);
- }
-
- private void commonConfiguration()
- {
- try
- {
- implementationSpecificConfiguration(_virtualHost.getName(), _virtualHost);
- createOrOpenDatabase();
- upgradeIfNecessary();
- }
- catch (ClassNotFoundException e)
- {
- throw new StoreException("Unable to configure message store ", e);
- }
- catch (SQLException e)
- {
- throw new StoreException("Unable to configure message store ", e);
- }
+ _messageStoreStateManager.attainState(State.ACTIVE);
}
protected void upgradeIfNecessary() throws SQLException
@@ -370,8 +374,7 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
}
}
- protected abstract void implementationSpecificConfiguration(String name,
- VirtualHost virtualHost) throws ClassNotFoundException, SQLException;
+ protected abstract void implementationSpecificConfiguration(String name, Map<String, Object> messageStoreSettings) throws ClassNotFoundException, SQLException;
abstract protected Logger getLogger();
@@ -381,13 +384,11 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
abstract protected String getSqlBigIntType();
- protected void createOrOpenDatabase() throws SQLException
+ protected void createOrOpenMessageStoreDatabase() throws SQLException
{
Connection conn = newAutoCommitConnection();
createVersionTable(conn);
- createConfigVersionTable(conn);
- createConfiguredObjectsTable(conn);
createQueueEntryTable(conn);
createMetaDataTable(conn);
createMessageContentTable(conn);
@@ -398,6 +399,16 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
conn.close();
}
+ protected void createOrOpenConfigurationStoreDatabase() throws SQLException
+ {
+ Connection conn = newAutoCommitConnection();
+
+ createConfigVersionTable(conn);
+ createConfiguredObjectsTable(conn);
+
+ conn.close();
+ }
+
private void createVersionTable(final Connection conn) throws SQLException
{
if(!tableExists(DB_VERSION_TABLE_NAME, conn))
@@ -645,21 +656,6 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
}
}
- protected void recoverConfiguration(ConfigurationRecoveryHandler recoveryHandler)
- {
- try
- {
- recoveryHandler.beginConfigurationRecovery(this, getConfigVersion());
- loadConfiguredObjects(recoveryHandler);
-
- setConfigVersion(recoveryHandler.completeConfigurationRecovery());
- }
- catch (SQLException e)
- {
- throw new StoreException("Error recovering persistent state: " + e.getMessage(), e);
- }
- }
-
private void setConfigVersion(int version) throws SQLException
{
Connection conn = newAutoCommitConnection();
@@ -722,16 +718,29 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
}
@Override
- public void close()
+ public void closeMessageStore()
{
- if (_closed.compareAndSet(false, true))
- {
- _stateManager.attainState(State.CLOSING);
+ _messageStoreStateManager.attainState(State.CLOSING);
+ if (_configurationStoreStateManager.isInState(State.CLOSED) || _configurationStoreStateManager.isInState(State.INITIAL))
+ {
doClose();
+ }
+
+ _messageStoreStateManager.attainState(State.CLOSED);
+ }
- _stateManager.attainState(State.CLOSED);
+ @Override
+ public void closeConfigurationStore()
+ {
+ _configurationStoreStateManager.attainState(State.CLOSING);
+
+ if (_messageStoreStateManager.isInState(State.CLOSED) || _messageStoreStateManager.isInState(State.INITIAL))
+ {
+ doClose();
}
+
+ _configurationStoreStateManager.attainState(State.CLOSED);
}
@@ -819,7 +828,7 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
@Override
public void create(UUID id, String type, Map<String,Object> attributes) throws StoreException
{
- if (_stateManager.isInState(State.ACTIVE))
+ if (_configurationStoreStateManager.isInState(State.ACTIVE))
{
insertConfiguredObject(new ConfiguredObjectRecord(id, type, attributes));
}
@@ -839,7 +848,7 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
@Override
public void update(UUID id, String type, Map<String, Object> attributes) throws StoreException
{
- if (_stateManager.isInState(State.ACTIVE))
+ if (_configurationStoreStateManager.isInState(State.ACTIVE))
{
ConfiguredObjectRecord queueConfiguredObject = loadConfiguredObject(id);
if (queueConfiguredObject != null)
@@ -1153,11 +1162,6 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
}
- protected boolean isConfigStoreOnly()
- {
- return _messageRecoveryHandler == null;
- }
-
private static final class ConnectionWrapper
{
private final Connection _connection;
@@ -1995,7 +1999,7 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
private void insertConfiguredObject(ConfiguredObjectRecord configuredObject) throws StoreException
{
- if (_stateManager.isInState(State.ACTIVE))
+ if (_configurationStoreStateManager.isInState(State.ACTIVE))
{
try
{
@@ -2141,7 +2145,7 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
private void updateConfiguredObject(final ConfiguredObjectRecord configuredObject) throws StoreException
{
- if (_stateManager.isInState(State.ACTIVE))
+ if (_configurationStoreStateManager.isInState(State.ACTIVE))
{
try
{
@@ -2164,7 +2168,7 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
public void update(boolean createIfNecessary, ConfiguredObjectRecord... records) throws StoreException
{
- if (_stateManager.isInState(State.ACTIVE) || _stateManager.isInState(State.ACTIVATING))
+ if (_configurationStoreStateManager.isInState(State.ACTIVE) || _configurationStoreStateManager.isInState(State.ACTIVATING))
{
try
{
@@ -2405,13 +2409,21 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
Connection conn = newAutoCommitConnection();
try
{
- for (String tableName : ALL_TABLES)
+ List<String> tables = new ArrayList<String>();
+ tables.addAll(CONFIGURATION_STORE_TABLE_NAMES);
+ tables.addAll(MESSAGE_STORE_TABLE_NAMES);
+
+ for (String tableName : tables)
{
Statement stmt = conn.createStatement();
try
{
stmt.execute("DROP TABLE " + tableName);
}
+ catch(SQLException e)
+ {
+ getLogger().warn("Failed to drop table '" + tableName + "' :" + e);
+ }
finally
{
stmt.close();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractMemoryMessageStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractMemoryMessageStore.java
index d72bd1a5bf..afdeb257e0 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractMemoryMessageStore.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractMemoryMessageStore.java
@@ -20,17 +20,15 @@
*/
package org.apache.qpid.server.store;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.qpid.server.message.EnqueueableMessage;
-import org.apache.qpid.server.model.VirtualHost;
/** A simple message store that stores the messages in a thread-safe structure in memory. */
abstract public class AbstractMemoryMessageStore extends NullMessageStore
{
private final AtomicLong _messageId = new AtomicLong(1);
- private final AtomicBoolean _closed = new AtomicBoolean(false);
private static final Transaction IN_MEMORY_TRANSACTION = new Transaction()
{
@@ -80,30 +78,26 @@ abstract public class AbstractMemoryMessageStore extends NullMessageStore
}
@Override
- public void configureConfigStore(VirtualHost virtualHost, ConfigurationRecoveryHandler recoveryHandler)
+ public void openConfigurationStore(String virtualHostName, Map<String, Object> storeSettings)
{
- _stateManager.attainState(State.INITIALISING);
}
@Override
- public void configureMessageStore(VirtualHost virtualHost, MessageStoreRecoveryHandler recoveryHandler,
- TransactionLogRecoveryHandler tlogRecoveryHandler)
+ public void recoverConfigurationStore(ConfigurationRecoveryHandler recoveryHandler)
{
- if(_stateManager.isInState(State.INITIAL))
- {
- _stateManager.attainState(State.INITIALISING);
- }
- _stateManager.attainState(State.INITIALISED);
+
}
@Override
- public void activate()
+ public void openMessageStore(String virtualHostName, Map<String, Object> messageStoreSettings)
{
+ _stateManager.attainState(State.INITIALISING);
+ _stateManager.attainState(State.INITIALISED);
+ }
- if(_stateManager.isInState(State.INITIALISING))
- {
- _stateManager.attainState(State.INITIALISED);
- }
+ @Override
+ public void recoverMessageStore(MessageStoreRecoveryHandler messageRecoveryHandler, TransactionLogRecoveryHandler transactionLogRecoveryHandler)
+ {
_stateManager.attainState(State.ACTIVATING);
_stateManager.attainState(State.ACTIVE);
@@ -131,13 +125,10 @@ abstract public class AbstractMemoryMessageStore extends NullMessageStore
}
@Override
- public void close()
+ public void closeMessageStore()
{
- if (_closed.compareAndSet(false, true))
- {
- _stateManager.attainState(State.CLOSING);
- _stateManager.attainState(State.CLOSED);
- }
+ _stateManager.attainState(State.CLOSING);
+ _stateManager.attainState(State.CLOSED);
}
@Override
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java
index 5921abc324..10e56f1f71 100755
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java
@@ -23,8 +23,6 @@ package org.apache.qpid.server.store;
import java.util.Map;
import java.util.UUID;
-import org.apache.qpid.server.model.VirtualHost;
-
public interface DurableConfigurationStore
{
String STORE_TYPE = "storeType";
@@ -38,16 +36,17 @@ public interface DurableConfigurationStore
/**
* Called after instantiation in order to configure the message store. A particular implementation can define
* whatever parameters it wants.
- *
- *
- *
- *
- *
- * @param virtualHost
- * @param recoveryHandler Handler to be called as the store recovers on start up
+ * @param virtualHostName host name
+ * @param storeSettings store settings
*/
- void configureConfigStore(VirtualHost virtualHost, ConfigurationRecoveryHandler recoveryHandler);
+ void openConfigurationStore(String virtualHostName, Map<String, Object> storeSettings) throws StoreException;
+ /**
+ * Recovers configuration from the store using given recovery handler
+ *
+ * @param recoveryHandler recovery handler
+ */
+ void recoverConfigurationStore(ConfigurationRecoveryHandler recoveryHandler) throws StoreException;
/**
* Makes the specified object persistent.
@@ -86,8 +85,8 @@ public interface DurableConfigurationStore
void update(UUID id, String type, Map<String, Object> attributes) throws StoreException;
- public void update(boolean createIfNecessary, ConfiguredObjectRecord... records) throws StoreException;
+ void update(boolean createIfNecessary, ConfiguredObjectRecord... records) throws StoreException;
- void close() throws Exception;
+ void closeConfigurationStore() throws StoreException;
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java
index 2f869201cf..a1512bbc22 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java
@@ -67,12 +67,17 @@ public class JsonFileConfigStore implements DurableConfigurationStore
}
@Override
- public void configureConfigStore(final VirtualHost virtualHost, final ConfigurationRecoveryHandler recoveryHandler)
+ public void openConfigurationStore(String virtualHostName, Map<String, Object> storeSettings)
{
- _name = virtualHost.getName();
+ _name = virtualHostName;
- setup(virtualHost);
+ setup(storeSettings);
load();
+ }
+
+ @Override
+ public void recoverConfigurationStore(ConfigurationRecoveryHandler recoveryHandler)
+ {
recoveryHandler.beginConfigurationRecovery(this,_configVersion);
List<ConfiguredObjectRecord> records = new ArrayList<ConfiguredObjectRecord>(_objectsById.values());
for(ConfiguredObjectRecord record : records)
@@ -87,11 +92,8 @@ public class JsonFileConfigStore implements DurableConfigurationStore
}
}
- private void setup(final VirtualHost virtualHost)
+ private void setup(final Map<String, Object> configurationStoreSettings)
{
- @SuppressWarnings("unchecked")
- Map<String, Object> configurationStoreSettings = virtualHost.getConfigurationStoreSettings();
-
Object storePathAttr = configurationStoreSettings.get(DurableConfigurationStore.STORE_PATH);
if(!(storePathAttr instanceof String))
{
@@ -510,12 +512,17 @@ public class JsonFileConfigStore implements DurableConfigurationStore
save();
}
- public void close() throws Exception
+ @Override
+ public void closeConfigurationStore()
{
try
{
releaseFileLock();
}
+ catch (IOException e)
+ {
+ throw new StoreException("Failed to release lock", e);
+ }
finally
{
_fileLock = null;
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java
index b3a6216c84..fb99191477 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java
@@ -20,7 +20,7 @@
*/
package org.apache.qpid.server.store;
-import org.apache.qpid.server.model.VirtualHost;
+import java.util.Map;
/**
* MessageStore defines the interface to a storage area, which can be used to preserve the state of messages.
@@ -34,21 +34,20 @@ public interface MessageStore
String OVERFULL_SIZE = "storeOverfullSize";
/**
- * Called after instantiation in order to configure the message store. A particular implementation can define
+ * Called after instantiation in order to open and initialize the message store. A particular implementation can define
* whatever parameters it wants.
- *
- *
- *
- *
- * @param virtualHost
- * @param messageRecoveryHandler Handler to be called as the store recovers on start up
- * @param tlogRecoveryHandler
- * @throws Exception If any error occurs that means the store is unable to configure itself.
+ * @param virtualHostName virtual host name
+ * @param messageStoreSettings store settings
*/
- void configureMessageStore(VirtualHost virtualHost, MessageStoreRecoveryHandler messageRecoveryHandler,
- TransactionLogRecoveryHandler tlogRecoveryHandler);
+ void openMessageStore(String virtualHostName, Map<String, Object> messageStoreSettings);
- void activate();
+ /**
+ * Called after opening to recover messages and transactions with given recovery handlers
+ *
+ * @param messageRecoveryHandler
+ * @param transactionLogRecoveryHandler
+ */
+ void recoverMessageStore(MessageStoreRecoveryHandler messageRecoveryHandler, TransactionLogRecoveryHandler transactionLogRecoveryHandler);
public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(T metaData);
@@ -64,10 +63,8 @@ public interface MessageStore
/**
* Called to close and cleanup any resources used by the message store.
- *
- * @throws Exception If the close fails.
*/
- void close();
+ void closeMessageStore();
void addEventListener(EventListener eventListener, Event... events);
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java
index c579a27731..c095675602 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java
@@ -22,12 +22,15 @@ package org.apache.qpid.server.store;
import java.util.Map;
import java.util.UUID;
-import org.apache.qpid.server.model.VirtualHost;
-
public abstract class NullMessageStore implements MessageStore, DurableConfigurationStore
{
@Override
- public void configureConfigStore(VirtualHost virtualHost, ConfigurationRecoveryHandler recoveryHandler)
+ public void openConfigurationStore(String virtualHostName, Map<String, Object> storeSettings)
+ {
+ }
+
+ @Override
+ public void recoverConfigurationStore(ConfigurationRecoveryHandler recoveryHandler)
{
}
@@ -59,13 +62,17 @@ public abstract class NullMessageStore implements MessageStore, DurableConfigura
}
@Override
- public void configureMessageStore(VirtualHost virtualHost, MessageStoreRecoveryHandler recoveryHandler,
- TransactionLogRecoveryHandler tlogRecoveryHandler)
+ public void openMessageStore(String virtualHostName, Map<String, Object> messageStoreSettings)
+ {
+ }
+
+ @Override
+ public void closeMessageStore()
{
}
@Override
- public void close()
+ public void closeConfigurationStore()
{
}
@@ -88,7 +95,7 @@ public abstract class NullMessageStore implements MessageStore, DurableConfigura
}
@Override
- public void activate()
+ public void recoverMessageStore(MessageStoreRecoveryHandler messageRecoveryHandler, TransactionLogRecoveryHandler transactionLogRecoveryHandler)
{
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
index 3ffa34f4fa..a14b1ad8c1 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
@@ -68,6 +68,7 @@ import org.apache.qpid.server.store.DurableConfigurationStoreHelper;
import org.apache.qpid.server.store.DurableConfiguredObjectRecoverer;
import org.apache.qpid.server.store.Event;
import org.apache.qpid.server.store.EventListener;
+import org.apache.qpid.server.store.StoreException;
import org.apache.qpid.server.txn.DtxRegistry;
import org.apache.qpid.server.util.MapValueConverter;
@@ -572,10 +573,10 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg
//Close MessageStore
if (getMessageStore() != null)
{
- //Remove MessageStore Interface should not throw Exception
+ // TODO Remove MessageStore Interface should not throw Exception
try
{
- getMessageStore().close();
+ getMessageStore().closeMessageStore();
}
catch (Exception e)
{
@@ -584,14 +585,13 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg
}
if (getDurableConfigurationStore() != null)
{
- //Remove MessageStore Interface should not throw Exception
try
{
- getDurableConfigurationStore().close();
+ getDurableConfigurationStore().closeConfigurationStore();
}
- catch (Exception e)
+ catch (StoreException e)
{
- _logger.error("Failed to close message store", e);
+ _logger.error("Failed to close configuration store", e);
}
}
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java
index efdca3b67d..2d824cbd2d 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java
@@ -20,16 +20,23 @@
*/
package org.apache.qpid.server.virtualhost;
+import static org.apache.qpid.server.model.VirtualHost.CURRENT_CONFIG_VERSION;
+
+import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.UUID;
+
+import org.apache.log4j.Logger;
import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.filter.FilterSupport;
import org.apache.qpid.server.exchange.TopicExchange;
+import org.apache.qpid.server.filter.FilterSupport;
import org.apache.qpid.server.model.Binding;
import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.queue.QueueArgumentsConverter;
import org.apache.qpid.server.store.ConfiguredObjectRecord;
import org.apache.qpid.server.store.DurableConfigurationRecoverer;
@@ -38,19 +45,38 @@ import org.apache.qpid.server.store.NonNullUpgrader;
import org.apache.qpid.server.store.NullUpgrader;
import org.apache.qpid.server.store.UpgraderProvider;
-import static org.apache.qpid.server.model.VirtualHost.CURRENT_CONFIG_VERSION;
-
public class DefaultUpgraderProvider implements UpgraderProvider
{
+ private static final Logger LOGGER = Logger.getLogger(DefaultUpgraderProvider.class);
+
public static final String EXCLUSIVE = "exclusive";
+ public static final String NAME = "name";
private final ExchangeRegistry _exchangeRegistry;
private final VirtualHost _virtualHost;
+ @SuppressWarnings("serial")
+ private static final Map<String, String> DEFAULT_EXCHANGES = Collections.unmodifiableMap(new HashMap<String, String>()
+ {{
+ put("amq.direct", "direct");
+ put("amq.topic", "topic");
+ put("amq.fanout", "fanout");
+ put("amq.match", "headers");
+ }});
+
+ private final Map<String, UUID> _defaultExchangeIds;
+
public DefaultUpgraderProvider(final VirtualHost virtualHost,
final ExchangeRegistry exchangeRegistry)
{
_virtualHost = virtualHost;
_exchangeRegistry = exchangeRegistry;
+ Map<String, UUID> defaultExchangeIds = new HashMap<String, UUID>();
+ for (String exchangeName : DEFAULT_EXCHANGES.keySet())
+ {
+ UUID id = UUIDGenerator.generateExchangeUUID(exchangeName, _virtualHost.getName());
+ defaultExchangeIds.put(exchangeName, id);
+ }
+ _defaultExchangeIds = Collections.unmodifiableMap(defaultExchangeIds);
}
public DurableConfigurationStoreUpgrader getUpgrader(final int configVersion, DurableConfigurationRecoverer recoverer)
@@ -66,6 +92,8 @@ public class DefaultUpgraderProvider implements UpgraderProvider
currentUpgrader = addUpgrader(currentUpgrader, new Version2Upgrader());
case 3:
currentUpgrader = addUpgrader(currentUpgrader, new Version3Upgrader());
+ case 4:
+ currentUpgrader = addUpgrader(currentUpgrader, new Version4Upgrader());
case CURRENT_CONFIG_VERSION:
currentUpgrader = addUpgrader(currentUpgrader, new NullUpgrader(recoverer));
break;
@@ -131,6 +159,11 @@ public class DefaultUpgraderProvider implements UpgraderProvider
}
else
{
+ if (_defaultExchangeIds.get("amq.topic").equals(exchangeId))
+ {
+ return true;
+ }
+
return _exchangeRegistry.getExchange(exchangeId) != null
&& _exchangeRegistry.getExchange(exchangeId).getExchangeType() == TopicExchange.TYPE;
}
@@ -209,6 +242,10 @@ public class DefaultUpgraderProvider implements UpgraderProvider
private boolean unknownExchange(final String exchangeIdString)
{
UUID exchangeId = UUID.fromString(exchangeIdString);
+ if (_defaultExchangeIds.containsValue(exchangeId))
+ {
+ return false;
+ }
ConfiguredObjectRecord localRecord = getUpdateMap().get(exchangeId);
return !((localRecord != null && localRecord.getType().equals(Exchange.class.getSimpleName()))
|| _exchangeRegistry.getExchange(exchangeId) != null);
@@ -310,4 +347,50 @@ public class DefaultUpgraderProvider implements UpgraderProvider
}
}
+ private class Version4Upgrader extends NonNullUpgrader
+ {
+ private Map<String, String> _missingAmqpExchanges = new HashMap<String, String>(DEFAULT_EXCHANGES);
+
+ @Override
+ public void configuredObject(UUID id, String type, Map<String, Object> attributes)
+ {
+ if(Exchange.class.getSimpleName().equals(type))
+ {
+ String name = (String)attributes.get(NAME);
+ _missingAmqpExchanges.remove(name);
+ }
+
+ getNextUpgrader().configuredObject(id,type,attributes);
+ }
+
+ @Override
+ public void complete()
+ {
+ for (Entry<String, String> entry : _missingAmqpExchanges.entrySet())
+ {
+ String name = entry.getKey();
+ String type = entry.getValue();
+ UUID id = _defaultExchangeIds.get(name);
+
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Creating amqp exchange " + name + " with id " + id);
+ }
+
+ Map<String, Object> attributes = new HashMap<String, Object>();
+ attributes.put(org.apache.qpid.server.model.Exchange.NAME, name);
+ attributes.put(org.apache.qpid.server.model.Exchange.TYPE, type);
+
+ attributes.put(org.apache.qpid.server.model.Exchange.DURABLE, true);
+
+ getUpdateMap().put(id, new ConfiguredObjectRecord(id, Exchange.class.getSimpleName(), attributes));
+
+ getNextUpgrader().configuredObject(id, Exchange.class.getSimpleName(), attributes);
+
+ }
+
+ getNextUpgrader().complete();
+ }
+ }
+
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java
index be6cc52981..58a0e689cb 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java
@@ -92,14 +92,17 @@ public class StandardVirtualHost extends AbstractVirtualHost
DurableConfigurationRecoverer configRecoverer =
new DurableConfigurationRecoverer(getName(), getDurableConfigurationRecoverers(),
new DefaultUpgraderProvider(this, getExchangeRegistry()), getEventLogger());
- _durableConfigurationStore.configureConfigStore(virtualHost, configRecoverer);
+ _durableConfigurationStore.openConfigurationStore(virtualHost.getName(), _durableConfigurationStore == _messageStore ? messageStoreSettings: configurationStoreSettings);
- VirtualHostConfigRecoveryHandler recoveryHandler = new VirtualHostConfigRecoveryHandler(this);
- _messageStore.configureMessageStore(virtualHost, recoveryHandler, recoveryHandler);
+ _messageStore.openMessageStore(virtualHost.getName(), virtualHost.getMessageStoreSettings());
+
+ _durableConfigurationStore.recoverConfigurationStore(configRecoverer);
+ // If store does not have entries for standard exchanges (amq.*), the following will create them.
initialiseModel();
- _messageStore.activate();
+ VirtualHostConfigRecoveryHandler recoveryHandler = new VirtualHostConfigRecoveryHandler(this);
+ _messageStore.recoverMessageStore(recoveryHandler, recoveryHandler);
attainActivation();
}
@@ -116,4 +119,5 @@ public class StandardVirtualHost extends AbstractVirtualHost
return _durableConfigurationStore;
}
-}
+
+} \ No newline at end of file
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java
index 76d460b296..b4dfbe837d 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java
@@ -29,7 +29,6 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-import static org.mockito.Mockito.times;
import java.io.File;
import java.util.HashMap;
@@ -41,17 +40,13 @@ import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.server.binding.BindingImpl;
import org.apache.qpid.server.exchange.ExchangeImpl;
import org.apache.qpid.server.logging.EventLogger;
-import org.apache.qpid.server.message.EnqueueableMessage;
import org.apache.qpid.server.model.ExclusivityPolicy;
import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.UUIDGenerator;
-import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.security.SecurityManager;
-import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler;
-import org.apache.qpid.server.store.Transaction.Record;
import org.apache.qpid.test.utils.QpidTestCase;
import org.apache.qpid.util.FileUtils;
import org.mockito.ArgumentCaptor;
@@ -69,15 +64,8 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
private String _storePath;
private String _storeName;
- private MessageStore _messageStore;
- private VirtualHost _virtualHost;
private ConfigurationRecoveryHandler _recoveryHandler;
- private MessageStoreRecoveryHandler _messageStoreRecoveryHandler;
- private StoredMessageRecoveryHandler _storedMessageRecoveryHandler;
- private TransactionLogRecoveryHandler _logRecoveryHandler;
- private TransactionLogRecoveryHandler.QueueEntryRecoveryHandler _queueEntryRecoveryHandler;
- private TransactionLogRecoveryHandler.DtxRecordRecoveryHandler _dtxRecordRecoveryHandler;
private ExchangeImpl _exchange = mock(ExchangeImpl.class);
private static final String ROUTING_KEY = "routingKey";
@@ -86,40 +74,28 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
private UUID _queueId;
private UUID _exchangeId;
private DurableConfigurationStore _configStore;
- protected Map<String, Object> _messageStoreSettings;
+ protected Map<String, Object> _configurationStoreSettings;
public void setUp() throws Exception
{
super.setUp();
- _messageStoreSettings = new HashMap<String, Object>();
+ _configurationStoreSettings = new HashMap<String, Object>();
_queueId = UUIDGenerator.generateRandomUUID();
_exchangeId = UUIDGenerator.generateRandomUUID();
_storeName = getName();
_storePath = TMP_FOLDER + File.separator + _storeName;
- _messageStoreSettings.put(MessageStore.STORE_PATH, _storePath);
+ _configurationStoreSettings.put(MessageStore.STORE_PATH, _storePath);
FileUtils.delete(new File(_storePath), true);
setTestSystemProperty("QPID_WORK", TMP_FOLDER);
_recoveryHandler = mock(ConfigurationRecoveryHandler.class);
- _storedMessageRecoveryHandler = mock(StoredMessageRecoveryHandler.class);
- _logRecoveryHandler = mock(TransactionLogRecoveryHandler.class);
- _messageStoreRecoveryHandler = mock(MessageStoreRecoveryHandler.class);
- _queueEntryRecoveryHandler = mock(TransactionLogRecoveryHandler.QueueEntryRecoveryHandler.class);
- _dtxRecordRecoveryHandler = mock(TransactionLogRecoveryHandler.DtxRecordRecoveryHandler.class);
- _virtualHost = mock(VirtualHost.class);
- when(_virtualHost.getSecurityManager()).thenReturn(mock(org.apache.qpid.server.security.SecurityManager.class));
-
- when(_messageStoreRecoveryHandler.begin()).thenReturn(_storedMessageRecoveryHandler);
- when(_logRecoveryHandler.begin(any(MessageStore.class))).thenReturn(_queueEntryRecoveryHandler);
- when(_queueEntryRecoveryHandler.completeQueueEntryRecovery()).thenReturn(_dtxRecordRecoveryHandler);
- when(_exchange.getName()).thenReturn(EXCHANGE_NAME);
+ when(_exchange.getName()).thenReturn(EXCHANGE_NAME);
when(_exchange.getId()).thenReturn(_exchangeId);
when(_exchange.getExchangeType()).thenReturn(mock(ExchangeType.class));
when(_exchange.getEventLogger()).thenReturn(new EventLogger());
- when(_virtualHost.getMessageStoreSettings()).thenReturn(_messageStoreSettings);
_bindingArgs = new HashMap<String, Object>();
String argKey = AMQPFilterTypes.JMS_SELECTOR.toString();
@@ -133,7 +109,6 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
{
try
{
- closeMessageStore();
closeConfigStore();
FileUtils.delete(new File(_storePath), true);
}
@@ -446,123 +421,20 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
private void reopenStore() throws Exception
{
- closeMessageStore();
closeConfigStore();
- _messageStore = createMessageStore();
_configStore = createConfigStore();
- _configStore.configureConfigStore(_virtualHost, _recoveryHandler);
- _messageStore.configureMessageStore(_virtualHost, _messageStoreRecoveryHandler, _logRecoveryHandler);
- _messageStore.activate();
+ _configStore.openConfigurationStore("testName", _configurationStoreSettings);
+ _configStore.recoverConfigurationStore(_recoveryHandler);
}
- protected abstract MessageStore createMessageStore() throws Exception;
protected abstract DurableConfigurationStore createConfigStore() throws Exception;
- protected abstract void closeMessageStore() throws Exception;
- protected abstract void closeConfigStore() throws Exception;
- public void testRecordXid() throws Exception
+ protected void closeConfigStore() throws Exception
{
- Record enqueueRecord = getTestRecord(1);
- Record dequeueRecord = getTestRecord(2);
- Record[] enqueues = { enqueueRecord };
- Record[] dequeues = { dequeueRecord };
- byte[] globalId = new byte[] { 1 };
- byte[] branchId = new byte[] { 2 };
-
- Transaction transaction = _messageStore.newTransaction();
- transaction.recordXid(1l, globalId, branchId, enqueues, dequeues);
- transaction.commitTran();
- reopenStore();
- verify(_dtxRecordRecoveryHandler).dtxRecord(1l, globalId, branchId, enqueues, dequeues);
-
- transaction = _messageStore.newTransaction();
- transaction.removeXid(1l, globalId, branchId);
- transaction.commitTran();
-
- reopenStore();
- verify(_dtxRecordRecoveryHandler, times(1)).dtxRecord(1l, globalId, branchId, enqueues, dequeues);
- }
-
- private Record getTestRecord(long messageNumber)
- {
- UUID queueId1 = UUIDGenerator.generateRandomUUID();
- TransactionLogResource queue1 = mock(TransactionLogResource.class);
- when(queue1.getId()).thenReturn(queueId1);
- EnqueueableMessage message1 = mock(EnqueueableMessage.class);
- when(message1.isPersistent()).thenReturn(true);
- when(message1.getMessageNumber()).thenReturn(messageNumber);
- final StoredMessage storedMessage = mock(StoredMessage.class);
- when(storedMessage.getMessageNumber()).thenReturn(messageNumber);
- when(message1.getStoredMessage()).thenReturn(storedMessage);
- Record enqueueRecord = new TestRecord(queue1, message1);
- return enqueueRecord;
- }
-
- private static class TestRecord implements Record
- {
- private TransactionLogResource _queue;
- private EnqueueableMessage _message;
-
- public TestRecord(TransactionLogResource queue, EnqueueableMessage message)
- {
- super();
- _queue = queue;
- _message = message;
- }
-
- @Override
- public TransactionLogResource getResource()
- {
- return _queue;
- }
-
- @Override
- public EnqueueableMessage getMessage()
+ if (_configStore != null)
{
- return _message;
+ _configStore.closeConfigurationStore();
}
-
- @Override
- public int hashCode()
- {
- final int prime = 31;
- int result = 1;
- result = prime * result + ((_message == null) ? 0 : new Long(_message.getMessageNumber()).hashCode());
- result = prime * result + ((_queue == null) ? 0 : _queue.getId().hashCode());
- return result;
- }
-
- @Override
- public boolean equals(Object obj)
- {
- if (this == obj)
- {
- return true;
- }
- if (obj == null)
- {
- return false;
- }
- if (!(obj instanceof Record))
- {
- return false;
- }
- Record other = (Record) obj;
- if (_message == null && other.getMessage() != null)
- {
- return false;
- }
- if (_queue == null && other.getResource() != null)
- {
- return false;
- }
- if (_message.getMessageNumber() != other.getMessage().getMessageNumber())
- {
- return false;
- }
- return _queue.getId().equals(other.getResource().getId());
- }
-
}
}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreConfigurationTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreConfigurationTest.java
new file mode 100644
index 0000000000..8f2d0029f6
--- /dev/null
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreConfigurationTest.java
@@ -0,0 +1,36 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+public class JsonFileConfigStoreConfigurationTest extends AbstractDurableConfigurationStoreTestCase
+{
+ @Override
+ protected DurableConfigurationStore createConfigStore() throws Exception
+ {
+ return new JsonFileConfigStore();
+ }
+
+ @Override
+ public void testBindQueue() throws Exception
+ {
+ // TODO: Temporarily disable the test as it is already fixed on trunk
+ }
+}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreTest.java
index 96609ae992..5d2998de86 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreTest.java
@@ -28,9 +28,10 @@ import java.util.UUID;
import org.apache.qpid.server.model.Binding;
import org.apache.qpid.server.model.Queue;
-import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
import org.apache.qpid.test.utils.QpidTestCase;
+import org.apache.qpid.test.utils.TestFileUtils;
+import org.apache.qpid.util.FileUtils;
import org.mockito.InOrder;
import static org.mockito.Matchers.any;
@@ -46,36 +47,34 @@ import static org.mockito.Mockito.when;
public class JsonFileConfigStoreTest extends QpidTestCase
{
private final ConfigurationRecoveryHandler _recoveryHandler = mock(ConfigurationRecoveryHandler.class);
- private VirtualHost _virtualHost;
+
private JsonFileConfigStore _store;
private HashMap<String, Object> _configurationStoreSettings;
+ private String _virtualHostName;
+ private File _storeLocation;
@Override
public void setUp() throws Exception
{
super.setUp();
- removeStoreFile();
- _virtualHost = mock(VirtualHost.class);
- when(_virtualHost.getName()).thenReturn(getName());
+ _virtualHostName = getName();
+ _storeLocation = TestFileUtils.createTestDirectory("json", true);
_configurationStoreSettings = new HashMap<String, Object>();
_configurationStoreSettings.put(JsonFileConfigStore.STORE_TYPE, JsonFileConfigStore.TYPE);
- _configurationStoreSettings.put(JsonFileConfigStore.STORE_PATH, TMP_FOLDER);
- when(_virtualHost.getConfigurationStoreSettings()).thenReturn(_configurationStoreSettings);
+ _configurationStoreSettings.put(JsonFileConfigStore.STORE_PATH, _storeLocation.getAbsolutePath());
_store = new JsonFileConfigStore();
}
@Override
public void tearDown() throws Exception
{
- removeStoreFile();
- }
-
- private void removeStoreFile()
- {
- File file = new File(TMP_FOLDER, getName() + ".json");
- if(file.exists())
+ try
+ {
+ super.tearDown();
+ }
+ finally
{
- file.delete();
+ FileUtils.delete(_storeLocation, true);
}
}
@@ -84,7 +83,7 @@ public class JsonFileConfigStoreTest extends QpidTestCase
_configurationStoreSettings.put(JsonFileConfigStore.STORE_PATH, null);
try
{
- _store.configureConfigStore(_virtualHost, _recoveryHandler);
+ _store.openConfigurationStore(_virtualHostName, _configurationStoreSettings);
fail("Store should not successfully configure if there is no path set");
}
catch (ServerScopedRuntimeException e)
@@ -99,7 +98,7 @@ public class JsonFileConfigStoreTest extends QpidTestCase
_configurationStoreSettings.put(JsonFileConfigStore.STORE_PATH, System.getProperty("file.separator"));
try
{
- _store.configureConfigStore(_virtualHost, _recoveryHandler);
+ _store.openConfigurationStore(_virtualHostName, _configurationStoreSettings);
fail("Store should not successfully configure if there is an invalid path set");
}
catch (ServerScopedRuntimeException e)
@@ -110,12 +109,13 @@ public class JsonFileConfigStoreTest extends QpidTestCase
public void testStartFromNoStore() throws Exception
{
- _store.configureConfigStore(_virtualHost, _recoveryHandler);
+ _store.openConfigurationStore(_virtualHostName, _configurationStoreSettings);
+ _store.recoverConfigurationStore(_recoveryHandler);
InOrder inorder = inOrder(_recoveryHandler);
inorder.verify(_recoveryHandler).beginConfigurationRecovery(eq(_store), eq(0));
inorder.verify(_recoveryHandler,never()).configuredObject(any(UUID.class),anyString(),anyMap());
inorder.verify(_recoveryHandler).completeConfigurationRecovery();
- _store.close();
+ _store.closeConfigurationStore();
}
public void testUpdatedConfigVersionIsRetained() throws Exception
@@ -123,10 +123,12 @@ public class JsonFileConfigStoreTest extends QpidTestCase
final int NEW_CONFIG_VERSION = 42;
when(_recoveryHandler.completeConfigurationRecovery()).thenReturn(NEW_CONFIG_VERSION);
- _store.configureConfigStore(_virtualHost, _recoveryHandler);
- _store.close();
+ _store.openConfigurationStore(_virtualHostName, _configurationStoreSettings);
+ _store.recoverConfigurationStore(_recoveryHandler);
+ _store.closeConfigurationStore();
- _store.configureConfigStore(_virtualHost, _recoveryHandler);
+ _store.openConfigurationStore(_virtualHostName, _configurationStoreSettings);
+ _store.recoverConfigurationStore(_recoveryHandler);
InOrder inorder = inOrder(_recoveryHandler);
// first time the config version should be the initial version - 0
@@ -135,27 +137,28 @@ public class JsonFileConfigStoreTest extends QpidTestCase
// second time the config version should be the updated version
inorder.verify(_recoveryHandler).beginConfigurationRecovery(eq(_store), eq(NEW_CONFIG_VERSION));
- _store.close();
+ _store.closeConfigurationStore();
}
public void testCreateObject() throws Exception
{
- _store.configureConfigStore(_virtualHost, _recoveryHandler);
+ _store.openConfigurationStore(_virtualHostName, _configurationStoreSettings);
final UUID queueId = new UUID(0, 1);
final String queueType = Queue.class.getSimpleName();
final Map<String,Object> queueAttr = Collections.singletonMap("name", (Object) "q1");
_store.create(queueId, queueType, queueAttr);
- _store.close();
+ _store.closeConfigurationStore();
- _store.configureConfigStore(_virtualHost, _recoveryHandler);
+ _store.openConfigurationStore(_virtualHostName, _configurationStoreSettings);
+ _store.recoverConfigurationStore(_recoveryHandler);
verify(_recoveryHandler).configuredObject(eq(queueId), eq(queueType), eq(queueAttr));
- _store.close();
+ _store.closeConfigurationStore();
}
public void testCreateAndUpdateObject() throws Exception
{
- _store.configureConfigStore(_virtualHost, _recoveryHandler);
+ _store.openConfigurationStore(_virtualHostName, _configurationStoreSettings);
final UUID queueId = new UUID(0, 1);
final String queueType = Queue.class.getSimpleName();
Map<String,Object> queueAttr = Collections.singletonMap("name", (Object) "q1");
@@ -167,17 +170,18 @@ public class JsonFileConfigStoreTest extends QpidTestCase
queueAttr.put("owner", "theowner");
_store.update(queueId, queueType, queueAttr);
- _store.close();
+ _store.closeConfigurationStore();
- _store.configureConfigStore(_virtualHost, _recoveryHandler);
+ _store.openConfigurationStore(_virtualHostName, _configurationStoreSettings);
+ _store.recoverConfigurationStore(_recoveryHandler);
verify(_recoveryHandler).configuredObject(eq(queueId), eq(queueType), eq(queueAttr));
- _store.close();
+ _store.closeConfigurationStore();
}
public void testCreateAndRemoveObject() throws Exception
{
- _store.configureConfigStore(_virtualHost, _recoveryHandler);
+ _store.openConfigurationStore(_virtualHostName, _configurationStoreSettings);
final UUID queueId = new UUID(0, 1);
final String queueType = Queue.class.getSimpleName();
Map<String,Object> queueAttr = Collections.singletonMap("name", (Object) "q1");
@@ -187,16 +191,17 @@ public class JsonFileConfigStoreTest extends QpidTestCase
_store.remove(queueId, queueType);
- _store.close();
+ _store.closeConfigurationStore();
- _store.configureConfigStore(_virtualHost, _recoveryHandler);
+ _store.openConfigurationStore(_virtualHostName, _configurationStoreSettings);
+ _store.recoverConfigurationStore(_recoveryHandler);
verify(_recoveryHandler, never()).configuredObject(any(UUID.class), anyString(), anyMap());
- _store.close();
+ _store.closeConfigurationStore();
}
public void testCreateUnknownObjectType() throws Exception
{
- _store.configureConfigStore(_virtualHost, _recoveryHandler);
+ _store.openConfigurationStore(_virtualHostName, _configurationStoreSettings);
try
{
_store.create(UUID.randomUUID(), "wibble", Collections.<String, Object>emptyMap());
@@ -210,7 +215,7 @@ public class JsonFileConfigStoreTest extends QpidTestCase
public void testTwoObjectsWithSameId() throws Exception
{
- _store.configureConfigStore(_virtualHost, _recoveryHandler);
+ _store.openConfigurationStore(_virtualHostName, _configurationStoreSettings);
final UUID id = UUID.randomUUID();
_store.create(id, "Queue", Collections.<String, Object>emptyMap());
try
@@ -227,11 +232,11 @@ public class JsonFileConfigStoreTest extends QpidTestCase
public void testChangeTypeOfObject() throws Exception
{
- _store.configureConfigStore(_virtualHost, _recoveryHandler);
+ _store.openConfigurationStore(_virtualHostName, _configurationStoreSettings);
final UUID id = UUID.randomUUID();
_store.create(id, "Queue", Collections.<String, Object>emptyMap());
- _store.close();
- _store.configureConfigStore(_virtualHost, _recoveryHandler);
+ _store.closeConfigurationStore();
+ _store.openConfigurationStore(_virtualHostName, _configurationStoreSettings);
try
{
@@ -246,21 +251,21 @@ public class JsonFileConfigStoreTest extends QpidTestCase
public void testLockFileGuaranteesExclusiveAccess() throws Exception
{
- _store.configureConfigStore(_virtualHost, _recoveryHandler);
+ _store.openConfigurationStore(_virtualHostName, _configurationStoreSettings);
JsonFileConfigStore secondStore = new JsonFileConfigStore();
try
{
- secondStore.configureConfigStore(_virtualHost, _recoveryHandler);
+ secondStore.openConfigurationStore(_virtualHostName, _configurationStoreSettings);
fail("Should not be able to open a second store with the same path");
}
catch(ServerScopedRuntimeException e)
{
// pass
}
- _store.close();
- secondStore.configureConfigStore(_virtualHost, _recoveryHandler);
+ _store.closeConfigurationStore();
+ secondStore.openConfigurationStore(_virtualHostName, _configurationStoreSettings);
}
@@ -268,7 +273,7 @@ public class JsonFileConfigStoreTest extends QpidTestCase
public void testCreatedNestedObjects() throws Exception
{
- _store.configureConfigStore(_virtualHost, _recoveryHandler);
+ _store.openConfigurationStore(_virtualHostName, _configurationStoreSettings);
final UUID queueId = new UUID(0, 1);
final UUID queue2Id = new UUID(1, 1);
@@ -290,14 +295,15 @@ public class JsonFileConfigStoreTest extends QpidTestCase
_store.update(true,
new ConfiguredObjectRecord(bindingId, "Binding", bindingAttributes),
new ConfiguredObjectRecord(binding2Id, "Binding", binding2Attributes));
- _store.close();
- _store.configureConfigStore(_virtualHost, _recoveryHandler);
+ _store.closeConfigurationStore();
+ _store.openConfigurationStore(_virtualHostName, _configurationStoreSettings);
+ _store.recoverConfigurationStore(_recoveryHandler);
verify(_recoveryHandler).configuredObject(eq(queueId), eq("Queue"), eq(EMPTY_ATTR));
verify(_recoveryHandler).configuredObject(eq(queue2Id), eq("Queue"), eq(EMPTY_ATTR));
verify(_recoveryHandler).configuredObject(eq(exchangeId), eq("Exchange"), eq(EMPTY_ATTR));
verify(_recoveryHandler).configuredObject(eq(bindingId),eq("Binding"), eq(bindingAttributes));
verify(_recoveryHandler).configuredObject(eq(binding2Id),eq("Binding"), eq(binding2Attributes));
- _store.close();
+ _store.closeConfigurationStore();
}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java
index 908f3fe6e1..e46a8939f4 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java
@@ -27,11 +27,11 @@ import java.io.File;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.UUID;
import org.apache.log4j.Logger;
import org.apache.qpid.server.message.EnqueueableMessage;
-import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler;
import org.apache.qpid.test.utils.QpidTestCase;
import org.apache.qpid.util.FileUtils;
@@ -49,7 +49,7 @@ public abstract class MessageStoreQuotaEventsTestBase extends QpidTestCase imple
private UUID _transactionResource;
protected abstract MessageStore createStore() throws Exception;
- protected abstract VirtualHost<?> createVirtualHost(String storeLocation);
+ protected abstract Map<String, Object> createStoreSettings(String storeLocation);
protected abstract int getNumberOfMessagesToFillStore();
@Override
@@ -61,14 +61,14 @@ public abstract class MessageStoreQuotaEventsTestBase extends QpidTestCase imple
FileUtils.delete(_storeLocation, true);
- VirtualHost<?> vhost = createVirtualHost(_storeLocation.getAbsolutePath());
+ Map<String, Object> storeSettings = createStoreSettings(_storeLocation.getAbsolutePath());
_store = createStore();
- ((DurableConfigurationStore)_store).configureConfigStore(vhost, null);
+ ((DurableConfigurationStore)_store).openConfigurationStore("test", storeSettings);
MessageStoreRecoveryHandler recoveryHandler = mock(MessageStoreRecoveryHandler.class);
when(recoveryHandler.begin()).thenReturn(mock(StoredMessageRecoveryHandler.class));
- _store.configureMessageStore(vhost, recoveryHandler, null);
- _store.activate();
+ _store.openMessageStore("test", storeSettings);
+ _store.recoverMessageStore(recoveryHandler, null);
_transactionResource = UUID.randomUUID();
_events = new ArrayList<Event>();
@@ -87,7 +87,7 @@ public abstract class MessageStoreQuotaEventsTestBase extends QpidTestCase imple
{
if (_store != null)
{
- _store.close();
+ _store.closeMessageStore();
}
FileUtils.delete(_storeLocation, true);
}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java
index 7ebfd54df6..45f7a2a39e 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java
@@ -23,50 +23,52 @@ package org.apache.qpid.server.store;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.times;
-import org.apache.qpid.server.model.VirtualHost;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.qpid.server.message.EnqueueableMessage;
+import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler;
+import org.apache.qpid.server.store.Transaction.Record;
import org.apache.qpid.test.utils.QpidTestCase;
public abstract class MessageStoreTestCase extends QpidTestCase
{
- private ConfigurationRecoveryHandler _recoveryHandler;
private MessageStoreRecoveryHandler _messageStoreRecoveryHandler;
private StoredMessageRecoveryHandler _storedMessageRecoveryHandler;
private TransactionLogRecoveryHandler _logRecoveryHandler;
private TransactionLogRecoveryHandler.QueueEntryRecoveryHandler _queueEntryRecoveryHandler;
private TransactionLogRecoveryHandler.DtxRecordRecoveryHandler _dtxRecordRecoveryHandler;
- private VirtualHost _virtualHost;
private MessageStore _store;
+ private Map<String, Object> _storeSettings;
public void setUp() throws Exception
{
super.setUp();
- _recoveryHandler = mock(ConfigurationRecoveryHandler.class);
_storedMessageRecoveryHandler = mock(StoredMessageRecoveryHandler.class);
_logRecoveryHandler = mock(TransactionLogRecoveryHandler.class);
_messageStoreRecoveryHandler = mock(MessageStoreRecoveryHandler.class);
_queueEntryRecoveryHandler = mock(TransactionLogRecoveryHandler.QueueEntryRecoveryHandler.class);
_dtxRecordRecoveryHandler = mock(TransactionLogRecoveryHandler.DtxRecordRecoveryHandler.class);
- _virtualHost = mock(VirtualHost.class);
-
when(_messageStoreRecoveryHandler.begin()).thenReturn(_storedMessageRecoveryHandler);
when(_logRecoveryHandler.begin(any(MessageStore.class))).thenReturn(_queueEntryRecoveryHandler);
when(_queueEntryRecoveryHandler.completeQueueEntryRecovery()).thenReturn(_dtxRecordRecoveryHandler);
- setUpStoreConfiguration(_virtualHost);
- when(_virtualHost.getName()).thenReturn(getTestName());
+ _storeSettings = getStoreSettings();
_store = createMessageStore();
- ((DurableConfigurationStore)_store).configureConfigStore(_virtualHost, _recoveryHandler);
- _store.configureMessageStore(_virtualHost, _messageStoreRecoveryHandler, _logRecoveryHandler);
+ _store.openMessageStore("test", _storeSettings);
+ _store.recoverMessageStore(_messageStoreRecoveryHandler, _logRecoveryHandler);
}
- protected abstract void setUpStoreConfiguration(VirtualHost virtualHost) throws Exception;
+ protected abstract Map<String, Object> getStoreSettings() throws Exception;
protected abstract MessageStore createMessageStore();
@@ -75,4 +77,116 @@ public abstract class MessageStoreTestCase extends QpidTestCase
return _store;
}
+ public void testRecordXid() throws Exception
+ {
+ Record enqueueRecord = getTestRecord(1);
+ Record dequeueRecord = getTestRecord(2);
+ Record[] enqueues = { enqueueRecord };
+ Record[] dequeues = { dequeueRecord };
+ byte[] globalId = new byte[] { 1 };
+ byte[] branchId = new byte[] { 2 };
+
+ Transaction transaction = _store.newTransaction();
+ transaction.recordXid(1l, globalId, branchId, enqueues, dequeues);
+ transaction.commitTran();
+ reopenStore();
+ verify(_dtxRecordRecoveryHandler).dtxRecord(1l, globalId, branchId, enqueues, dequeues);
+
+ transaction = _store.newTransaction();
+ transaction.removeXid(1l, globalId, branchId);
+ transaction.commitTran();
+
+ reopenStore();
+ verify(_dtxRecordRecoveryHandler, times(1)).dtxRecord(1l, globalId, branchId, enqueues, dequeues);
+ }
+
+ private void reopenStore() throws Exception
+ {
+ _store.closeMessageStore();
+
+ _store = createMessageStore();
+ _store.openMessageStore("test", _storeSettings);
+ _store.recoverMessageStore(_messageStoreRecoveryHandler, _logRecoveryHandler);
+ }
+ private Record getTestRecord(long messageNumber)
+ {
+ UUID queueId1 = UUIDGenerator.generateRandomUUID();
+ TransactionLogResource queue1 = mock(TransactionLogResource.class);
+ when(queue1.getId()).thenReturn(queueId1);
+ EnqueueableMessage message1 = mock(EnqueueableMessage.class);
+ when(message1.isPersistent()).thenReturn(true);
+ when(message1.getMessageNumber()).thenReturn(messageNumber);
+ final StoredMessage storedMessage = mock(StoredMessage.class);
+ when(storedMessage.getMessageNumber()).thenReturn(messageNumber);
+ when(message1.getStoredMessage()).thenReturn(storedMessage);
+ Record enqueueRecord = new TestRecord(queue1, message1);
+ return enqueueRecord;
+ }
+
+ private static class TestRecord implements Record
+ {
+ private TransactionLogResource _queue;
+ private EnqueueableMessage _message;
+
+ public TestRecord(TransactionLogResource queue, EnqueueableMessage message)
+ {
+ super();
+ _queue = queue;
+ _message = message;
+ }
+
+ @Override
+ public TransactionLogResource getResource()
+ {
+ return _queue;
+ }
+
+ @Override
+ public EnqueueableMessage getMessage()
+ {
+ return _message;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((_message == null) ? 0 : new Long(_message.getMessageNumber()).hashCode());
+ result = prime * result + ((_queue == null) ? 0 : _queue.getId().hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj)
+ {
+ if (this == obj)
+ {
+ return true;
+ }
+ if (obj == null)
+ {
+ return false;
+ }
+ if (!(obj instanceof Record))
+ {
+ return false;
+ }
+ Record other = (Record) obj;
+ if (_message == null && other.getMessage() != null)
+ {
+ return false;
+ }
+ if (_queue == null && other.getResource() != null)
+ {
+ return false;
+ }
+ if (_message.getMessageNumber() != other.getMessage().getMessageNumber())
+ {
+ return false;
+ }
+ return _queue.getId().equals(other.getResource().getId());
+ }
+
+ }
}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java
index 9f1f6f48c0..e3d56ab4bb 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java
@@ -32,13 +32,16 @@ import org.apache.qpid.server.exchange.ExchangeImpl;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.store.StoreException;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
+import org.apache.qpid.server.exchange.AMQUnknownExchangeType;
import org.apache.qpid.server.exchange.DirectExchange;
import org.apache.qpid.server.exchange.ExchangeFactory;
import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.exchange.FanoutExchange;
import org.apache.qpid.server.exchange.HeadersExchange;
import org.apache.qpid.server.exchange.TopicExchange;
import org.apache.qpid.server.model.Binding;
import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueFactory;
@@ -59,19 +62,21 @@ import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
-
import static org.apache.qpid.server.model.VirtualHost.CURRENT_CONFIG_VERSION;
public class DurableConfigurationRecovererTest extends QpidTestCase
{
+ private static final String VIRTUAL_HOST_NAME = "test";
private static final UUID QUEUE_ID = new UUID(0,0);
- private static final UUID TOPIC_EXCHANGE_ID = new UUID(0,1);
- private static final UUID DIRECT_EXCHANGE_ID = new UUID(0,2);
+ private static final UUID TOPIC_EXCHANGE_ID = UUIDGenerator.generateExchangeUUID(TopicExchange.TYPE.getDefaultExchangeName(), VIRTUAL_HOST_NAME);
+ private static final UUID DIRECT_EXCHANGE_ID = UUIDGenerator.generateExchangeUUID(DirectExchange.TYPE.getDefaultExchangeName(), VIRTUAL_HOST_NAME);
private static final String CUSTOM_EXCHANGE_NAME = "customExchange";
private DurableConfigurationRecoverer _durableConfigurationRecoverer;
- private ExchangeImpl _directExchange;
- private ExchangeImpl _topicExchange;
+ private ExchangeImpl<?> _directExchange;
+ private ExchangeImpl<?> _topicExchange;
+ private ExchangeImpl<?> _matchExchange;
+ private ExchangeImpl<?> _fanoutExchange;
private VirtualHost _vhost;
private DurableConfigurationStore _store;
private ExchangeFactory _exchangeFactory;
@@ -83,21 +88,19 @@ public class DurableConfigurationRecovererTest extends QpidTestCase
{
super.setUp();
+ _exchangeFactory = mock(ExchangeFactory.class);
- _directExchange = mock(ExchangeImpl.class);
- when(_directExchange.getExchangeType()).thenReturn(DirectExchange.TYPE);
-
-
- _topicExchange = mock(ExchangeImpl.class);
- when(_topicExchange.getExchangeType()).thenReturn(TopicExchange.TYPE);
+ _directExchange = createAndRegisterDefaultExchangeWithFactory(DirectExchange.TYPE);
+ _topicExchange = createAndRegisterDefaultExchangeWithFactory(TopicExchange.TYPE);
+ _matchExchange = createAndRegisterDefaultExchangeWithFactory(HeadersExchange.TYPE);
+ _fanoutExchange = createAndRegisterDefaultExchangeWithFactory(FanoutExchange.TYPE);
- AMQQueue queue = mock(AMQQueue.class);
+ AMQQueue<?> queue = mock(AMQQueue.class);
_vhost = mock(VirtualHost.class);
+ when(_vhost.getName()).thenReturn(VIRTUAL_HOST_NAME);
_exchangeRegistry = mock(ExchangeRegistry.class);
- when(_exchangeRegistry.getExchange(eq(DIRECT_EXCHANGE_ID))).thenReturn(_directExchange);
- when(_exchangeRegistry.getExchange(eq(TOPIC_EXCHANGE_ID))).thenReturn(_topicExchange);
when(_vhost.getQueue(eq(QUEUE_ID))).thenReturn(queue);
@@ -165,7 +168,6 @@ public class DurableConfigurationRecovererTest extends QpidTestCase
}
});
- _exchangeFactory = mock(ExchangeFactory.class);
DurableConfiguredObjectRecoverer[] recoverers = {
@@ -187,6 +189,19 @@ public class DurableConfigurationRecovererTest extends QpidTestCase
}
+ private ExchangeImpl<?> createAndRegisterDefaultExchangeWithFactory(ExchangeType<?> exchangeType) throws AMQUnknownExchangeType, UnknownExchangeException
+ {
+ ExchangeImpl exchange = mock(ExchangeImpl.class);
+ when(exchange.getExchangeType()).thenReturn(exchangeType);
+ Map<String, Object> directExchangeAttrsWithId = new HashMap<String, Object>();
+ directExchangeAttrsWithId.put(org.apache.qpid.server.model.Exchange.ID, UUIDGenerator.generateExchangeUUID(exchangeType.getDefaultExchangeName(), VIRTUAL_HOST_NAME));
+ directExchangeAttrsWithId.put(org.apache.qpid.server.model.Exchange.DURABLE, true);
+ directExchangeAttrsWithId.put(org.apache.qpid.server.model.Exchange.TYPE, exchangeType.getType());
+ directExchangeAttrsWithId.put(org.apache.qpid.server.model.Exchange.NAME, exchangeType.getDefaultExchangeName());
+ when(_exchangeFactory.restoreExchange(directExchangeAttrsWithId)).thenReturn(exchange);
+ return exchange;
+ }
+
public void testUpgradeEmptyStore() throws Exception
{
_durableConfigurationRecoverer.beginConfigurationRecovery(_store, 0);
@@ -275,12 +290,29 @@ public class DurableConfigurationRecovererTest extends QpidTestCase
public ExchangeImpl answer(final InvocationOnMock invocation) throws Throwable
{
Map arguments = attributesCaptor.getValue();
- if(CUSTOM_EXCHANGE_NAME.equals(arguments.get(org.apache.qpid.server.model.Exchange.NAME))
+ String exchangeName = (String) arguments.get(org.apache.qpid.server.model.Exchange.NAME);
+ if(CUSTOM_EXCHANGE_NAME.equals(exchangeName)
&& HeadersExchange.TYPE.getType().equals(arguments.get(org.apache.qpid.server.model.Exchange.TYPE))
- && customExchangeId.equals(arguments.get(org.apache.qpid.server.model.Exchange.ID)))
+ && customExchangeId.equals((UUID) arguments.get(org.apache.qpid.server.model.Exchange.ID)))
{
return customExchange;
}
+ else if ("amq.topic".equals(exchangeName))
+ {
+ return _topicExchange;
+ }
+ else if ("amq.direct".equals(exchangeName))
+ {
+ return _directExchange;
+ }
+ else if ("amq.fanout".equals(exchangeName))
+ {
+ return _fanoutExchange;
+ }
+ else if ("amq.match".equals(exchangeName))
+ {
+ return _matchExchange;
+ }
else
{
return null;
@@ -410,12 +442,29 @@ public class DurableConfigurationRecovererTest extends QpidTestCase
public ExchangeImpl answer(final InvocationOnMock invocation) throws Throwable
{
Map arguments = attributesCaptor.getValue();
- if(CUSTOM_EXCHANGE_NAME.equals(arguments.get(org.apache.qpid.server.model.Exchange.NAME))
+ String exchangeName = (String) arguments.get(org.apache.qpid.server.model.Exchange.NAME);
+ if(CUSTOM_EXCHANGE_NAME.equals(exchangeName)
&& HeadersExchange.TYPE.getType().equals(arguments.get(org.apache.qpid.server.model.Exchange.TYPE))
&& exchangeId.equals(arguments.get(org.apache.qpid.server.model.Exchange.ID)))
{
return customExchange;
}
+ else if ("amq.topic".equals(exchangeName))
+ {
+ return _topicExchange;
+ }
+ else if ("amq.direct".equals(exchangeName))
+ {
+ return _directExchange;
+ }
+ else if ("amq.fanout".equals(exchangeName))
+ {
+ return _fanoutExchange;
+ }
+ else if ("amq.match".equals(exchangeName))
+ {
+ return _matchExchange;
+ }
else
{
return null;
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/StandardVirtualHostTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/StandardVirtualHostTest.java
index e31cb16d27..5872383702 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/StandardVirtualHostTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/StandardVirtualHostTest.java
@@ -29,6 +29,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
+import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.server.binding.BindingImpl;
import org.apache.qpid.server.exchange.AbstractExchange;
import org.apache.qpid.server.exchange.ExchangeImpl;
@@ -101,27 +102,6 @@ public class StandardVirtualHostTest extends QpidTestCase
customBindingTestImpl(new String[0]);
}
- /**
- * Tests that specifying custom routing keys for a queue in the configuration file results in failure
- * to create the vhost (since this is illegal, only queue names are used with the default exchange)
- */
- public void testSpecifyingCustomBindingForDefaultExchangeThrowsException() throws Exception
- {
- final String queueName = getName();
- final String customBinding = "custom-binding";
- writeConfigFile(queueName, queueName, null, false, new String[]{customBinding});
-
- try
- {
- createVirtualHost(queueName);
- fail("virtualhost creation should have failed due to illegal configuration");
- }
- catch (IllegalConfigurationException e)
- {
- // pass
- }
- }
-
public void testVirtualHostBecomesActive() throws Exception
{
writeConfigFile(getName(), getName(), getName() +".direct", false, new String[0]);
@@ -145,6 +125,7 @@ public class StandardVirtualHostTest extends QpidTestCase
assertNotNull(vhost);
assertEquals(State.ACTIVE, vhost.getState());
vhost.close();
+ _virtualHostRegistry.unregisterVirtualHost(vhost);
assertEquals(State.STOPPED, vhost.getState());
assertEquals(0, vhost.getHouseKeepingActiveCount());
}
@@ -156,6 +137,7 @@ public class StandardVirtualHostTest extends QpidTestCase
assertNotNull(host);
assertEquals(State.ACTIVE, host.getState());
host.close();
+ _virtualHostRegistry.unregisterVirtualHost(host);
assertEquals(State.STOPPED, host.getState());
assertEquals(0, host.getHouseKeepingActiveCount());
}
@@ -296,16 +278,11 @@ public class StandardVirtualHostTest extends QpidTestCase
objectMapper.writeValue(new File(_storeFolder, vhostName + ".json"), data);
JsonFileConfigStore store = new JsonFileConfigStore();
- org.apache.qpid.server.model.VirtualHost<?> virtualHost = mock(org.apache.qpid.server.model.VirtualHost.class);
- when(virtualHost.getName()).thenReturn(vhostName);
Map<String, Object> configurationStoreSettings = new HashMap<String, Object>();
- when(virtualHost.getConfigurationStoreSettings()).thenReturn(configurationStoreSettings);
configurationStoreSettings.put(DurableConfigurationStore.STORE_TYPE, JsonFileConfigStore.TYPE);
configurationStoreSettings.put(DurableConfigurationStore.STORE_PATH, _storeFolder.getAbsolutePath());
- ConfigurationRecoveryHandler recoveryHandler = mock(ConfigurationRecoveryHandler.class);
- when(recoveryHandler.completeConfigurationRecovery()).thenReturn(org.apache.qpid.server.model.VirtualHost.CURRENT_CONFIG_VERSION);
- store.configureConfigStore(virtualHost , recoveryHandler );
+ store.openConfigurationStore(vhostName, configurationStoreSettings);
UUID exchangeId = UUIDGenerator.generateExchangeUUID(exchangeName == null? "amq.direct" : exchangeName, vhostName);
if(exchangeName != null && !dontDeclare)
@@ -355,7 +332,7 @@ public class StandardVirtualHostTest extends QpidTestCase
}
store.create(UUID.randomUUID(), org.apache.qpid.server.model.Binding.class.getSimpleName(), attributes);
}
- store.close();
+ store.closeConfigurationStore();
}
}
diff --git a/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java b/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
index f7b65075f7..2c2938cb01 100644
--- a/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
+++ b/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
@@ -35,7 +35,6 @@ import java.util.List;
import java.util.Map;
import org.apache.log4j.Logger;
-import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.store.AbstractJDBCMessageStore;
import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.Event;
@@ -125,28 +124,13 @@ public class DerbyMessageStore extends AbstractJDBCMessageStore implements Messa
}
@Override
- protected void implementationSpecificConfiguration(String name,
- VirtualHost virtualHost)
+ protected void implementationSpecificConfiguration(String name, Map<String, Object> messageStoreSettings)
throws ClassNotFoundException
{
//Update to pick up QPID_WORK and use that as the default location not just derbyDB
-
- Map<String, Object> messageStoreSettings = virtualHost.getMessageStoreSettings();
- Map<String, Object> configurationStoreSettings = virtualHost.getConfigurationStoreSettings();
_driverClass = (Class<Driver>) Class.forName(SQL_DRIVER_NAME);
- String databasePath = null;
- if (isConfigStoreOnly())
- {
- databasePath = (String) configurationStoreSettings.get(DurableConfigurationStore.STORE_PATH);
- }
- else
- {
- if (messageStoreSettings != null)
- {
- databasePath = (String) messageStoreSettings.get(MessageStore.STORE_PATH);
- }
- }
+ String databasePath = (String) messageStoreSettings.get(MessageStore.STORE_PATH);;
if(databasePath == null)
{
diff --git a/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreConfigurationTest.java b/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreConfigurationTest.java
index 4a1a3251e3..aaf65e9ee0 100644
--- a/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreConfigurationTest.java
+++ b/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreConfigurationTest.java
@@ -25,48 +25,10 @@ import org.apache.qpid.server.store.AbstractDurableConfigurationStoreTestCase;
public class DerbyMessageStoreConfigurationTest extends AbstractDurableConfigurationStoreTestCase
{
- private DerbyMessageStore _derbyMessageStore;
-
- @Override
- protected DerbyMessageStore createMessageStore() throws Exception
- {
- createStoreIfNecessary();
- return _derbyMessageStore;
- }
-
- @Override
- protected void closeMessageStore() throws Exception
- {
- closeStoreIfNecessary();
- }
-
- private void createStoreIfNecessary()
- {
- if(_derbyMessageStore == null)
- {
- _derbyMessageStore = new DerbyMessageStore();
- }
- }
-
@Override
protected DerbyMessageStore createConfigStore() throws Exception
{
- createStoreIfNecessary();
- return _derbyMessageStore;
+ return new DerbyMessageStore();
}
- @Override
- protected void closeConfigStore() throws Exception
- {
- closeStoreIfNecessary();
- }
-
- private void closeStoreIfNecessary() throws Exception
- {
- if (_derbyMessageStore != null)
- {
- _derbyMessageStore.close();
- _derbyMessageStore = null;
- }
- }
}
diff --git a/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreQuotaEventsTest.java b/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreQuotaEventsTest.java
index f23b5a3e23..ba7ae26292 100644
--- a/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreQuotaEventsTest.java
+++ b/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreQuotaEventsTest.java
@@ -20,14 +20,10 @@
*/
package org.apache.qpid.server.store.derby;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
import java.util.HashMap;
import java.util.Map;
import org.apache.log4j.Logger;
-import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.MessageStoreQuotaEventsTestBase;
@@ -58,20 +54,15 @@ public class DerbyMessageStoreQuotaEventsTest extends MessageStoreQuotaEventsTes
}
@Override
- protected VirtualHost<?> createVirtualHost(String storeLocation)
+ protected Map<String, Object> createStoreSettings(String storeLocation)
{
_logger.debug("Applying store specific config. overfull-size=" + OVERFULL_SIZE + ", underfull-size=" + UNDERFULL_SIZE);
- VirtualHost<?> vhost = mock(VirtualHost.class);
Map<String, Object> messageStoreSettings = new HashMap<String, Object>();
messageStoreSettings.put(MessageStore.STORE_PATH, storeLocation);
messageStoreSettings.put(MessageStore.OVERFULL_SIZE, OVERFULL_SIZE);
messageStoreSettings.put(MessageStore.UNDERFULL_SIZE, UNDERFULL_SIZE);
-
- when(vhost.getMessageStoreSettings()).thenReturn(messageStoreSettings );
- when(vhost.getName()).thenReturn("test");
-
- return vhost;
+ return messageStoreSettings;
}
}
diff --git a/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreTest.java b/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreTest.java
index e3f91cc8fb..20de4ea339 100644
--- a/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreTest.java
+++ b/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreTest.java
@@ -20,13 +20,11 @@
*/
package org.apache.qpid.server.store.derby;
-import static org.mockito.Mockito.when;
import java.io.File;
import java.util.HashMap;
import java.util.Map;
-import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.MessageStoreTestCase;
import org.apache.qpid.util.FileUtils;
@@ -53,7 +51,7 @@ public class DerbyMessageStoreTest extends MessageStoreTestCase
File location = new File(_storeLocation);
assertTrue("Store does not exist at " + _storeLocation, location.exists());
- getStore().close();
+ getStore().closeMessageStore();
assertTrue("Store does not exist at " + _storeLocation, location.exists());
getStore().onDelete();
@@ -61,13 +59,13 @@ public class DerbyMessageStoreTest extends MessageStoreTestCase
}
@Override
- protected void setUpStoreConfiguration(VirtualHost virtualHost) throws Exception
+ protected Map<String, Object> getStoreSettings() throws Exception
{
_storeLocation = TMP_FOLDER + File.separator + getTestName();
+ deleteStoreIfExists();
Map<String, Object> messageStoreSettings = new HashMap<String, Object>();
messageStoreSettings.put(MessageStore.STORE_PATH, _storeLocation);
- when(virtualHost.getMessageStoreSettings()).thenReturn(messageStoreSettings);
- deleteStoreIfExists();
+ return messageStoreSettings;
}
private void deleteStoreIfExists()
diff --git a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java
index 2c2b701c61..55c8d3ef79 100644
--- a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java
+++ b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java
@@ -31,7 +31,6 @@ import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.log4j.Logger;
-import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.plugin.JDBCConnectionProviderFactory;
import org.apache.qpid.server.store.AbstractJDBCMessageStore;
import org.apache.qpid.server.store.MessageStore;
@@ -259,18 +258,24 @@ public class JDBCMessageStore extends AbstractJDBCMessageStore implements Messag
@Override
protected void doClose()
{
- while(!_transactions.isEmpty())
- {
- RecordedJDBCTransaction txn = _transactions.get(0);
- txn.abortTran();
- }
try
{
- _connectionProvider.close();
+ while(!_transactions.isEmpty())
+ {
+ RecordedJDBCTransaction txn = _transactions.get(0);
+ txn.abortTran();
+ }
}
- catch (SQLException e)
+ finally
{
- throw new StoreException("Unable to close connection provider ", e);
+ try
+ {
+ _connectionProvider.close();
+ }
+ catch (SQLException e)
+ {
+ throw new StoreException("Unable to close connection provider ", e);
+ }
}
}
@@ -281,11 +286,9 @@ public class JDBCMessageStore extends AbstractJDBCMessageStore implements Messag
}
- protected void implementationSpecificConfiguration(String name,
- VirtualHost virtualHost)
+ protected void implementationSpecificConfiguration(String name, Map<String, Object> storeSettings)
throws ClassNotFoundException, SQLException
{
- Map<String, Object> storeSettings = isConfigStoreOnly() ? virtualHost.getConfigurationStoreSettings() : virtualHost.getMessageStoreSettings();
String connectionURL = String.valueOf(storeSettings.get(CONNECTION_URL));
Object poolAttribute = storeSettings.get(CONNECTION_POOL);
diff --git a/qpid/java/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java b/qpid/java/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java
index dc6bb0158d..2322fa7102 100644
--- a/qpid/java/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java
+++ b/qpid/java/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java
@@ -20,8 +20,6 @@
*/
package org.apache.qpid.server.store.jdbc;
-import static org.mockito.Mockito.when;
-
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.DriverManager;
@@ -32,7 +30,6 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Set;
-import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.MessageStoreTestCase;
@@ -55,21 +52,21 @@ public class JDBCMessageStoreTest extends MessageStoreTestCase
public void testOnDelete() throws Exception
{
- String[] expectedTables = JDBCMessageStore.ALL_TABLES;
+ Set<String> expectedTables = JDBCMessageStore.MESSAGE_STORE_TABLE_NAMES;
assertTablesExist(expectedTables, true);
- getStore().close();
+ getStore().closeMessageStore();
assertTablesExist(expectedTables, true);
getStore().onDelete();
assertTablesExist(expectedTables, false);
}
@Override
- protected void setUpStoreConfiguration(VirtualHost virtualHost) throws Exception
+ protected Map<String, Object> getStoreSettings()
{
_connectionURL = "jdbc:derby:memory:/" + getTestName() + ";create=true";
Map<String, Object> messageStoreSettings = new HashMap<String, Object>();
messageStoreSettings.put(JDBCMessageStore.CONNECTION_URL, _connectionURL);
- when(virtualHost.getMessageStoreSettings()).thenReturn(messageStoreSettings);
+ return messageStoreSettings;
}
@@ -79,7 +76,7 @@ public class JDBCMessageStoreTest extends MessageStoreTestCase
return new JDBCMessageStore();
}
- private void assertTablesExist(String[] expectedTables, boolean exists) throws SQLException
+ private void assertTablesExist(Set<String> expectedTables, boolean exists) throws SQLException
{
Set<String> existingTables = getTableNames();
for (String tableName : expectedTables)
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java
index 7017ea6d45..06b7091a47 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java
@@ -26,7 +26,6 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.qpid.server.message.EnqueueableMessage;
import org.apache.qpid.server.message.MessageContentSource;
-import org.apache.qpid.server.model.VirtualHost;
public class
QuotaMessageStore extends NullMessageStore
@@ -49,9 +48,15 @@ public class
}
@Override
- public void configureConfigStore(VirtualHost virtualHost, ConfigurationRecoveryHandler recoveryHandler)
+ public void openConfigurationStore(String virtualHostName, Map<String, Object> storeSettings)
{
- Map<String, Object> messageStoreSettings = virtualHost.getMessageStoreSettings();
+
+ }
+
+ @Override
+ public void openMessageStore(String virtualHostName, Map<String, Object> messageStoreSettings)
+ {
+ _stateManager.attainState(State.INITIALISING);
Object overfullAttr = messageStoreSettings.get(MessageStore.OVERFULL_SIZE);
_persistentSizeHighThreshold = overfullAttr == null
? Long.MAX_VALUE
@@ -72,18 +77,11 @@ public class
{
_persistentSizeLowThreshold = _persistentSizeHighThreshold;
}
- _stateManager.attainState(State.INITIALISING);
- }
-
- @Override
- public void configureMessageStore(VirtualHost virtualHost, MessageStoreRecoveryHandler recoveryHandler,
- TransactionLogRecoveryHandler tlogRecoveryHandler)
- {
_stateManager.attainState(State.INITIALISED);
}
@Override
- public void activate()
+ public void recoverMessageStore(MessageStoreRecoveryHandler messageRecoveryHandler, TransactionLogRecoveryHandler transactionLogRecoveryHandler)
{
_stateManager.attainState(State.ACTIVATING);
_stateManager.attainState(State.ACTIVE);
@@ -153,7 +151,7 @@ public class
}
@Override
- public void close()
+ public void closeMessageStore()
{
if (_closed.compareAndSet(false, true))
{
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
index 84d1a9cb1c..44a5252355 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
@@ -29,7 +29,7 @@ import java.util.concurrent.ConcurrentHashMap;
import org.apache.log4j.Logger;
import org.apache.qpid.server.message.EnqueueableMessage;
-import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.plugin.DurableConfigurationStoreFactory;
import org.apache.qpid.server.plugin.MessageStoreFactory;
public class SlowMessageStore implements MessageStore, DurableConfigurationStore
@@ -47,51 +47,26 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore
private HashMap<String, Long> _preDelays = new HashMap<String, Long>();
private HashMap<String, Long> _postDelays = new HashMap<String, Long>();
private long _defaultDelay = 0L;
- private MessageStore _realStore = null;
- private DurableConfigurationStore _durableConfigurationStore = null;
+ private MessageStore _realMessageStore = null;
+ private DurableConfigurationStore _realDurableConfigurationStore = null;
private Map<EventListener, Event[]> _eventListeners = new ConcurrentHashMap<EventListener, Event[]>();
- // ***** MessageStore Interface.
-
@Override
- public void configureConfigStore(VirtualHost virtualHost, ConfigurationRecoveryHandler recoveryHandler)
+ public void openConfigurationStore(String virtualHostName, Map<String, Object> storeSettings)
{
- _logger.info("Starting SlowMessageStore on Virtualhost:" + virtualHost.getName());
-
- Map<String, Object> messageStoreSettings = virtualHost.getMessageStoreSettings();
- Object delaysAttr = messageStoreSettings.get(DELAYS);
-
- @SuppressWarnings({ "unchecked" })
- Map<String,Object> delays = (delaysAttr instanceof Map) ? (Map<String,Object>) delaysAttr : Collections.<String,Object>emptyMap();
- configureDelays(delays);
-
- final Object realStoreAttr = messageStoreSettings.get(REAL_STORE);
- String messageStoreType = realStoreAttr == null ? MemoryMessageStore.TYPE : realStoreAttr.toString();
-
- if (delays.containsKey(DEFAULT_DELAY))
- {
- _defaultDelay = Long.parseLong(String.valueOf(delays.get(DEFAULT_DELAY)));
- }
-
- _realStore = MessageStoreFactory.FACTORY_LOADER.get(messageStoreType).createMessageStore();
-
- if (!_eventListeners.isEmpty())
- {
- for (Iterator<Map.Entry<EventListener, Event[]>> it = _eventListeners.entrySet().iterator(); it.hasNext();)
- {
- Map.Entry<EventListener, Event[]> entry = it.next();
- _realStore.addEventListener(entry.getKey(), entry.getValue());
- it.remove();
- }
- }
-
- if (_realStore instanceof DurableConfigurationStore)
+ if (storeSettings != null && storeSettings.get(REAL_STORE) != null)
{
- _durableConfigurationStore = (DurableConfigurationStore)_realStore;
- _durableConfigurationStore.configureConfigStore(virtualHost, recoveryHandler);
+ final String realStore = (String) storeSettings.get(REAL_STORE);
+ _realDurableConfigurationStore = new DurableConfigurationStoreCreator().createMessageStore(realStore);
+ _realDurableConfigurationStore.openConfigurationStore(virtualHostName, storeSettings);
}
+ }
+ @Override
+ public void recoverConfigurationStore(ConfigurationRecoveryHandler recoveryHandler)
+ {
+ _realDurableConfigurationStore.recoverConfigurationStore(recoveryHandler);
}
private void configureDelays(Map<String, Object> delays)
@@ -159,31 +134,67 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore
}
@Override
- public void configureMessageStore(VirtualHost virtualHost, MessageStoreRecoveryHandler messageRecoveryHandler,
- TransactionLogRecoveryHandler tlogRecoveryHandler)
+ public void openMessageStore(String virtualHostName, Map<String, Object> messageStoreSettings)
{
- _realStore.configureMessageStore(virtualHost, messageRecoveryHandler, tlogRecoveryHandler);
+ Object delaysAttr = messageStoreSettings.get(DELAYS);
+
+ @SuppressWarnings({ "unchecked" })
+ Map<String,Object> delays = (delaysAttr instanceof Map) ? (Map<String,Object>) delaysAttr : Collections.<String,Object>emptyMap();
+ configureDelays(delays);
+
+ if (delays.containsKey(DEFAULT_DELAY))
+ {
+ _defaultDelay = Long.parseLong(String.valueOf(delays.get(DEFAULT_DELAY)));
+ }
+
+ final Object realStoreAttr = messageStoreSettings.get(REAL_STORE) == null ? MemoryMessageStore.TYPE : messageStoreSettings.get(REAL_STORE);
+ final String realStore = (String) realStoreAttr;
+ _realMessageStore = MessageStoreFactory.FACTORY_LOADER.get(realStore).createMessageStore();
+
+ if (!_eventListeners.isEmpty())
+ {
+ for (Iterator<Map.Entry<EventListener, Event[]>> it = _eventListeners.entrySet().iterator(); it.hasNext();)
+ {
+ Map.Entry<EventListener, Event[]> entry = it.next();
+ _realMessageStore.addEventListener(entry.getKey(), entry.getValue());
+ it.remove();
+ }
+ }
+ _realMessageStore.openMessageStore(virtualHostName, messageStoreSettings);
+
+ if (_realDurableConfigurationStore == null)
+ {
+ _realDurableConfigurationStore = (DurableConfigurationStore) _realMessageStore;
+ }
+
}
@Override
- public void close()
+ public void closeMessageStore()
{
doPreDelay("close");
- _realStore.close();
+ _realMessageStore.closeMessageStore();
doPostDelay("close");
}
@Override
+ public void closeConfigurationStore()
+ {
+ _realDurableConfigurationStore.closeConfigurationStore();
+ }
+
+
+ @Override
public <M extends StorableMessageMetaData> StoredMessage<M> addMessage(M metaData)
{
- return _realStore.addMessage(metaData);
+ return _realMessageStore.addMessage(metaData);
}
@Override
public void create(UUID id, String type, Map<String, Object> attributes) throws StoreException
{
doPreDelay("create");
- _durableConfigurationStore.create(id, type, attributes);
+ _realDurableConfigurationStore.create(id, type, attributes);
doPostDelay("create");
}
@@ -191,7 +202,7 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore
public void remove(UUID id, String type) throws StoreException
{
doPreDelay("remove");
- _durableConfigurationStore.remove(id, type);
+ _realDurableConfigurationStore.remove(id, type);
doPostDelay("remove");
}
@@ -199,7 +210,7 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore
public UUID[] removeConfiguredObjects(final UUID... objects) throws StoreException
{
doPreDelay("remove");
- UUID[] removed = _durableConfigurationStore.removeConfiguredObjects(objects);
+ UUID[] removed = _realDurableConfigurationStore.removeConfiguredObjects(objects);
doPostDelay("remove");
return removed;
}
@@ -208,7 +219,7 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore
public void update(UUID id, String type, Map<String, Object> attributes) throws StoreException
{
doPreDelay("update");
- _durableConfigurationStore.update(id, type, attributes);
+ _realDurableConfigurationStore.update(id, type, attributes);
doPostDelay("update");
}
@@ -216,7 +227,7 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore
public void update(boolean createIfNecessary, ConfiguredObjectRecord... records) throws StoreException
{
doPreDelay("update");
- _durableConfigurationStore.update(createIfNecessary, records);
+ _realDurableConfigurationStore.update(createIfNecessary, records);
doPostDelay("update");
}
@@ -224,7 +235,7 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore
public Transaction newTransaction()
{
doPreDelay("beginTran");
- Transaction txn = new SlowTransaction(_realStore.newTransaction());
+ Transaction txn = new SlowTransaction(_realMessageStore.newTransaction());
doPostDelay("beginTran");
return txn;
}
@@ -232,7 +243,7 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore
@Override
public boolean isPersistent()
{
- return _realStore.isPersistent();
+ return _realMessageStore.isPersistent();
}
private class SlowTransaction implements Transaction
@@ -299,28 +310,28 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore
}
@Override
- public void activate()
+ public void recoverMessageStore(MessageStoreRecoveryHandler messageRecoveryHandler, TransactionLogRecoveryHandler transactionLogRecoveryHandler)
{
- _realStore.activate();
+ _realMessageStore.recoverMessageStore(messageRecoveryHandler, transactionLogRecoveryHandler);
}
@Override
public void addEventListener(EventListener eventListener, Event... events)
{
- if (_realStore == null)
+ if (_realMessageStore == null)
{
_eventListeners .put(eventListener, events);
}
else
{
- _realStore.addEventListener(eventListener, events);
+ _realMessageStore.addEventListener(eventListener, events);
}
}
@Override
public String getStoreLocation()
{
- return _realStore.getStoreLocation();
+ return _realMessageStore.getStoreLocation();
}
@Override
@@ -332,7 +343,7 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore
@Override
public void onDelete()
{
- _realStore.onDelete();
+ _realMessageStore.onDelete();
}
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SplitStoreTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SplitStoreTest.java
new file mode 100644
index 0000000000..9f244e78a4
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SplitStoreTest.java
@@ -0,0 +1,130 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.apache.qpid.configuration.ClientProperties;
+import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.apache.qpid.test.utils.TestBrokerConfiguration;
+import org.apache.qpid.util.FileUtils;
+
+public class SplitStoreTest extends QpidBrokerTestCase
+{
+ private String _messageStorePath;
+ private String _configStorePath;
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ String virtualHostWorkDir = System.getProperty("QPID_WORK") + File.separator + TestBrokerConfiguration.ENTRY_NAME_VIRTUAL_HOST + File.separator;
+ _messageStorePath = virtualHostWorkDir + "messageStore";
+ _configStorePath = virtualHostWorkDir + "configStore";
+ }
+
+ @Override
+ public void startBroker() throws Exception
+ {
+ // Overridden to prevent QBTC starting the Broker.
+ }
+
+ public void testJsonConfigurationStoreWithPersistentMessageStore() throws Exception
+ {
+ Map<String, Object> configurationStoreSettings = new HashMap<String, Object>();
+ configurationStoreSettings.put(DurableConfigurationStore.STORE_TYPE, JsonFileConfigStore.TYPE);
+ configurationStoreSettings.put(DurableConfigurationStore.STORE_PATH, _configStorePath);
+
+ doTest(configurationStoreSettings);
+ }
+
+ public void testSeparateConfigurationAndMessageStoresOfTheSameType() throws Exception
+ {
+ Map<String, Object> configurationStoreSettings = new HashMap<String, Object>();
+ configurationStoreSettings.put(DurableConfigurationStore.STORE_TYPE, getTestProfileMessageStoreType());
+ configurationStoreSettings.put(DurableConfigurationStore.STORE_PATH, _configStorePath);
+
+ doTest(configurationStoreSettings);
+ }
+
+ private void configureAndStartBroker(Map<String, Object> configurationStoreSettings) throws Exception
+ {
+ Map<String, Object> messageStoreSettings = new HashMap<String, Object>();
+ messageStoreSettings.put(MessageStore.STORE_TYPE, getTestProfileMessageStoreType());
+ messageStoreSettings.put(MessageStore.STORE_PATH, _messageStorePath);
+
+ TestBrokerConfiguration config = getBrokerConfiguration();
+ config.setObjectAttribute(TestBrokerConfiguration.ENTRY_NAME_VIRTUAL_HOST, VirtualHost.MESSAGE_STORE_SETTINGS, messageStoreSettings);
+ config.setObjectAttribute(TestBrokerConfiguration.ENTRY_NAME_VIRTUAL_HOST, VirtualHost.CONFIGURATION_STORE_SETTINGS, configurationStoreSettings);
+
+ super.startBroker();
+ }
+
+ private void doTest(Map<String, Object> configurationStoreSettings) throws Exception
+ {
+ configureAndStartBroker(configurationStoreSettings);
+
+ Connection connection = getConnection();
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ Queue queue = session.createQueue(getTestQueueName());
+ session.createConsumer(queue).close(); // Create durable queue by side effect
+ sendMessage(session, queue, 2);
+ connection.close();
+
+ restartBroker();
+
+ setTestSystemProperty(ClientProperties.QPID_DECLARE_QUEUES_PROP_NAME, "false");
+ connection = getConnection();
+ connection.start();
+ session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ MessageConsumer consumer = session.createConsumer(queue);
+ Message message = consumer.receive(1000);
+ session.commit();
+
+ assertNotNull("Message was not received after first restart", message);
+ assertEquals("Unexpected message received after first restart", 0, message.getIntProperty(INDEX));
+
+ stopBroker();
+ File messageStoreFile = new File(_messageStorePath);
+ FileUtils.delete(messageStoreFile, true);
+ assertFalse("Store folder was not deleted", messageStoreFile.exists());
+ super.startBroker();
+
+ connection = getConnection();
+ connection.start();
+ session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ consumer = session.createConsumer(queue);
+ message = consumer.receive(500);
+
+ assertNull("Message was received after store removal", message);
+ }
+
+}
diff --git a/qpid/java/test-profiles/CPPExcludes b/qpid/java/test-profiles/CPPExcludes
index 6ce0936c08..6f7de94e5d 100755
--- a/qpid/java/test-profiles/CPPExcludes
+++ b/qpid/java/test-profiles/CPPExcludes
@@ -82,6 +82,7 @@ org.apache.qpid.server.AlertingTest#*
// The C++ server has a totally different persistence mechanism
org.apache.qpid.server.store.PersistentStoreTest#*
+org.apache.qpid.server.store.SplitStoreTest#*
// These tests are for the Java broker persistent store modules
org.apache.qpid.server.store.MessageStoreTest#testMessagePersistence
diff --git a/qpid/java/test-profiles/JavaTransientExcludes b/qpid/java/test-profiles/JavaTransientExcludes
index 2f96584589..0b060051e9 100644
--- a/qpid/java/test-profiles/JavaTransientExcludes
+++ b/qpid/java/test-profiles/JavaTransientExcludes
@@ -20,6 +20,7 @@
//These tests require a persistent store
org.apache.qpid.server.persistent.NoLocalAfterRecoveryTest#*
org.apache.qpid.server.store.PersistentStoreTest#*
+org.apache.qpid.server.store.SplitStoreTest#*
org.apache.qpid.server.logging.AlertingTest#testAlertingReallyWorksWithRestart
org.apache.qpid.server.logging.AlertingTest#testAlertingReallyWorksWithChanges
org.apache.qpid.test.unit.ack.ClientAcknowledgeTest#testClientAckWithLargeFlusherPeriod