From fcc3f654b60b7dd2180afe73e8809545725b41af Mon Sep 17 00:00:00 2001 From: Keith Wall Date: Tue, 25 Mar 2014 10:07:21 +0000 Subject: Introduction of separate lifecycle methods on stores for open/close/recover. Change Upgraders responsibility to create the amqp standard exchanges when upgrading from earlier store versions. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-bdb-ha2@1581288 13f79535-47bb-0310-9956-ffa450edef68 --- .../server/store/berkeleydb/BDBHAVirtualHost.java | 32 ++-- .../server/store/berkeleydb/BDBMessageStore.java | 213 +++++++++++---------- .../store/berkeleydb/EnvironmentFacadeFactory.java | 4 +- .../StandardEnvironmentFacadeFactory.java | 33 +--- .../ReplicatedEnvironmentFacadeFactory.java | 4 +- .../BDBMessageStoreConfigurationTest.java | 42 +--- .../berkeleydb/BDBMessageStoreQuotaEventsTest.java | 11 +- .../store/berkeleydb/BDBMessageStoreTest.java | 12 +- .../org/apache/qpid/server/model/VirtualHost.java | 2 +- .../server/store/AbstractJDBCMessageStore.java | 204 ++++++++++---------- .../server/store/AbstractMemoryMessageStore.java | 37 ++-- .../server/store/DurableConfigurationStore.java | 23 ++- .../qpid/server/store/JsonFileConfigStore.java | 23 ++- .../org/apache/qpid/server/store/MessageStore.java | 29 ++- .../apache/qpid/server/store/NullMessageStore.java | 21 +- .../server/virtualhost/AbstractVirtualHost.java | 12 +- .../virtualhost/DefaultUpgraderProvider.java | 89 ++++++++- .../server/virtualhost/StandardVirtualHost.java | 14 +- .../AbstractDurableConfigurationStoreTestCase.java | 146 +------------- .../JsonFileConfigStoreConfigurationTest.java | 36 ++++ .../qpid/server/store/JsonFileConfigStoreTest.java | 102 +++++----- .../store/MessageStoreQuotaEventsTestBase.java | 14 +- .../qpid/server/store/MessageStoreTestCase.java | 136 +++++++++++-- .../DurableConfigurationRecovererTest.java | 85 ++++++-- .../virtualhost/StandardVirtualHostTest.java | 33 +--- .../qpid/server/store/derby/DerbyMessageStore.java | 20 +- .../derby/DerbyMessageStoreConfigurationTest.java | 40 +--- .../derby/DerbyMessageStoreQuotaEventsTest.java | 13 +- .../server/store/derby/DerbyMessageStoreTest.java | 10 +- .../qpid/server/store/jdbc/JDBCMessageStore.java | 27 +-- .../server/store/jdbc/JDBCMessageStoreTest.java | 13 +- .../qpid/server/store/QuotaMessageStore.java | 22 +-- .../apache/qpid/server/store/SlowMessageStore.java | 125 ++++++------ .../apache/qpid/server/store/SplitStoreTest.java | 130 +++++++++++++ qpid/java/test-profiles/CPPExcludes | 1 + qpid/java/test-profiles/JavaTransientExcludes | 1 + 36 files changed, 968 insertions(+), 791 deletions(-) create mode 100644 qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreConfigurationTest.java create mode 100644 qpid/java/systests/src/main/java/org/apache/qpid/server/store/SplitStoreTest.java diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java index e41b81f846..7e42d09ba6 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java @@ -75,18 +75,12 @@ public class BDBHAVirtualHost extends AbstractVirtualHost _messageStore.addEventListener(new AfterInitialisationListener(), Event.AFTER_INIT); _messageStore.addEventListener(new BeforePassivationListener(), Event.BEFORE_PASSIVATE); - VirtualHostConfigRecoveryHandler recoveryHandler = new VirtualHostConfigRecoveryHandler(this); - DurableConfigurationRecoverer configRecoverer = - new DurableConfigurationRecoverer(getName(), getDurableConfigurationRecoverers(), - new DefaultUpgraderProvider(this, getExchangeRegistry()), getEventLogger()); - - _messageStore.configureConfigStore( - virtualHost, configRecoverer + _messageStore.openConfigurationStore( + virtualHost.getName(), virtualHost.getMessageStoreSettings() ); - _messageStore.configureMessageStore( - virtualHost, recoveryHandler, - recoveryHandler + _messageStore.openMessageStore( + virtualHost.getName(), virtualHost.getMessageStoreSettings() ); // Make the virtualhost model object a replication group listener @@ -105,7 +99,8 @@ public class BDBHAVirtualHost extends AbstractVirtualHost try { _inVhostInitiatedClose = true; - getMessageStore().close(); + _messageStore.closeMessageStore(); + _messageStore.closeConfigurationStore(); } catch (Exception e) { @@ -214,7 +209,7 @@ public class BDBHAVirtualHost extends AbstractVirtualHost if (LOGGER.isInfoEnabled()) { LOGGER.info("Received BDB event indicating transition to state " + state - + " when current message store state is " + _messageStore._stateManager.getState()); + + " when current message store state is " + _messageStore._messageStoreStateManager.getState()); } switch (state) @@ -243,7 +238,14 @@ public class BDBHAVirtualHost extends AbstractVirtualHost try { _messageStore.getEnvironmentFacade().getEnvironment().flushLog(true); - _messageStore.activate(); + + DurableConfigurationRecoverer configRecoverer = + new DurableConfigurationRecoverer(getName(), getDurableConfigurationRecoverers(), + new DefaultUpgraderProvider(BDBHAVirtualHost.this, getExchangeRegistry()), getEventLogger()); + _messageStore.recoverConfigurationStore(configRecoverer); + + VirtualHostConfigRecoveryHandler recoveryHandler = new VirtualHostConfigRecoveryHandler(BDBHAVirtualHost.this); + _messageStore.recoverMessageStore(recoveryHandler, recoveryHandler); } catch (Exception e) { @@ -256,9 +258,9 @@ public class BDBHAVirtualHost extends AbstractVirtualHost try { //TODO: move this this into the store method passivate() - if (_messageStore._stateManager.isNotInState(org.apache.qpid.server.store.State.INITIALISED)) + if (_messageStore._messageStoreStateManager.isNotInState(org.apache.qpid.server.store.State.INITIALISED)) { - _messageStore._stateManager.attainState(org.apache.qpid.server.store.State.INITIALISED); + _messageStore._messageStoreStateManager.attainState(org.apache.qpid.server.store.State.INITIALISED); } } catch (Exception e) diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java index 8be037eb7b..2022f36bd9 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java @@ -30,12 +30,10 @@ import java.util.List; import java.util.Map; import java.util.Random; import java.util.UUID; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import org.apache.log4j.Logger; import org.apache.qpid.server.message.EnqueueableMessage; -import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.store.ConfigurationRecoveryHandler; import org.apache.qpid.server.store.ConfiguredObjectRecord; @@ -102,25 +100,20 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore private static String MESSAGE_META_DATA_DB_NAME = "MESSAGE_METADATA"; private static String MESSAGE_CONTENT_DB_NAME = "MESSAGE_CONTENT"; private static String DELIVERY_DB_NAME = "QUEUE_ENTRIES"; + + //TODO: Add upgrader to remove BRIDGES and LINKS private static String BRIDGEDB_NAME = "BRIDGES"; private static String LINKDB_NAME = "LINKS"; private static String XID_DB_NAME = "XIDS"; private static String CONFIG_VERSION_DB_NAME = "CONFIG_VERSION"; - private static final String[] DATABASE_NAMES = new String[] { CONFIGURED_OBJECTS_DB_NAME, MESSAGE_META_DATA_DB_NAME, - MESSAGE_CONTENT_DB_NAME, DELIVERY_DB_NAME, BRIDGEDB_NAME, LINKDB_NAME, XID_DB_NAME, CONFIG_VERSION_DB_NAME }; - - private final AtomicBoolean _closed = new AtomicBoolean(false); + private static final String[] CONFIGURATION_STORE_DATABASE_NAMES = new String[] { CONFIGURED_OBJECTS_DB_NAME, CONFIG_VERSION_DB_NAME }; + private static final String[] MESSAGE_STORE_DATABASE_NAMES = new String[] { MESSAGE_META_DATA_DB_NAME, MESSAGE_CONTENT_DB_NAME, DELIVERY_DB_NAME, BRIDGEDB_NAME, LINKDB_NAME, XID_DB_NAME }; private EnvironmentFacade _environmentFacade; private final AtomicLong _messageId = new AtomicLong(0); - protected final StateManager _stateManager; - - private MessageStoreRecoveryHandler _messageRecoveryHandler; - - private TransactionLogRecoveryHandler _tlogRecoveryHandler; - - private ConfigurationRecoveryHandler _configRecoveryHandler; + protected final StateManager _messageStoreStateManager; + private final StateManager _configurationStoreStateManager; private long _totalStoreSize; private boolean _limitBusted; @@ -129,12 +122,14 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore private final EventManager _eventManager = new EventManager(); private final String _type; - private VirtualHost _virtualHost; private final EnvironmentFacadeFactory _environmentFacadeFactory; private volatile Committer _committer; + private String _virtualHostName; + + public BDBMessageStore() { this(new StandardEnvironmentFacadeFactory()); @@ -144,7 +139,8 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore { _type = environmentFacadeFactory.getType(); _environmentFacadeFactory = environmentFacadeFactory; - _stateManager = new StateManager(_eventManager); + _messageStoreStateManager = new StateManager(_eventManager); + _configurationStoreStateManager = new StateManager(new EventManager()); } @Override @@ -154,94 +150,47 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore } @Override - public void configureConfigStore(VirtualHost virtualHost, ConfigurationRecoveryHandler recoveryHandler) + public void openConfigurationStore(String virtualHostName, Map storeSettings) { - _stateManager.attainState(State.INITIALISING); + _configurationStoreStateManager.attainState(State.INITIALISING); - _configRecoveryHandler = recoveryHandler; - _virtualHost = virtualHost; - } - - @Override - public void configureMessageStore(VirtualHost virtualHost, MessageStoreRecoveryHandler messageRecoveryHandler, - TransactionLogRecoveryHandler tlogRecoveryHandler) throws StoreException - { - if(_stateManager.isInState(State.INITIAL)) + _virtualHostName = virtualHostName; + if (_environmentFacade == null) { - // Is acting as a message store, but not a durable config store - _stateManager.attainState(State.INITIALISING); + _environmentFacade = _environmentFacadeFactory.createEnvironmentFacade(_virtualHostName, storeSettings); } - - _messageRecoveryHandler = messageRecoveryHandler; - _tlogRecoveryHandler = tlogRecoveryHandler; - _virtualHost = virtualHost; - - - completeInitialisation(); + _configurationStoreStateManager.attainState(State.INITIALISED); } - private void completeInitialisation() throws StoreException + @Override + public void recoverConfigurationStore(ConfigurationRecoveryHandler recoveryHandler) { - configure(_virtualHost, _messageRecoveryHandler != null); + _configurationStoreStateManager.attainState(State.ACTIVATING); - _stateManager.attainState(State.INITIALISED); - } - - private void startActivation() throws StoreException - { DatabaseConfig dbConfig = new DatabaseConfig(); dbConfig.setTransactional(true); dbConfig.setAllowCreate(true); try { - new Upgrader(_environmentFacade.getEnvironment(), _virtualHost.getName()).upgradeIfNecessary(); - _environmentFacade.openDatabases(dbConfig, DATABASE_NAMES); - _totalStoreSize = getSizeOnDisk(); + new Upgrader(_environmentFacade.getEnvironment(), _virtualHostName).upgradeIfNecessary(); + _environmentFacade.openDatabases(dbConfig, CONFIGURATION_STORE_DATABASE_NAMES); } catch(DatabaseException e) { throw _environmentFacade.handleDatabaseException("Cannot configure store", e); } + recoverConfig(recoveryHandler); + _configurationStoreStateManager.attainState(State.ACTIVE); } @Override - public synchronized void activate() throws StoreException + public void openMessageStore(String virtualHostName, Map messageStoreSettings) throws StoreException { - // check if acting as a durable config store, but not a message store - if(_stateManager.isInState(State.INITIALISING)) - { - completeInitialisation(); - } - - _stateManager.attainState(State.ACTIVATING); - startActivation(); - - if(_configRecoveryHandler != null) - { - recoverConfig(_configRecoveryHandler); - } - if(_messageRecoveryHandler != null) - { - recoverMessages(_messageRecoveryHandler); - } - if(_tlogRecoveryHandler != null) - { - recoverQueueEntries(_tlogRecoveryHandler); - } + _messageStoreStateManager.attainState(State.INITIALISING); - _stateManager.attainState(State.ACTIVE); - } + _virtualHostName = virtualHostName; - @Override - public org.apache.qpid.server.store.Transaction newTransaction() throws StoreException - { - return new BDBTransaction(); - } - - private void configure(VirtualHost virtualHost, boolean isMessageStore) throws StoreException - { - Map messageStoreSettings = virtualHost.getMessageStoreSettings(); Object overfullAttr = messageStoreSettings.get(MessageStore.OVERFULL_SIZE); Object underfullAttr = messageStoreSettings.get(MessageStore.UNDERFULL_SIZE); @@ -250,16 +199,56 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore _persistentSizeLowThreshold = underfullAttr == null ? _persistentSizeHighThreshold : underfullAttr instanceof Number ? ((Number) underfullAttr).longValue() : Long.parseLong(underfullAttr.toString()); - if(_persistentSizeLowThreshold > _persistentSizeHighThreshold || _persistentSizeLowThreshold < 0l) { _persistentSizeLowThreshold = _persistentSizeHighThreshold; } - _environmentFacade = _environmentFacadeFactory.createEnvironmentFacade(virtualHost, isMessageStore); + if (_environmentFacade == null) + { + _environmentFacade = _environmentFacadeFactory.createEnvironmentFacade(_virtualHostName, messageStoreSettings); + } - _committer = _environmentFacade.createCommitter(virtualHost.getName()); + _committer = _environmentFacade.createCommitter(_virtualHostName); _committer.start(); + + _messageStoreStateManager.attainState(State.INITIALISED); + } + + @Override + public synchronized void recoverMessageStore(MessageStoreRecoveryHandler messageRecoveryHandler, TransactionLogRecoveryHandler transactionLogRecoveryHandler) throws StoreException + { + _messageStoreStateManager.attainState(State.ACTIVATING); + DatabaseConfig dbConfig = new DatabaseConfig(); + dbConfig.setTransactional(true); + dbConfig.setAllowCreate(true); + try + { + new Upgrader(_environmentFacade.getEnvironment(), _virtualHostName).upgradeIfNecessary(); + _environmentFacade.openDatabases(dbConfig, MESSAGE_STORE_DATABASE_NAMES); + _totalStoreSize = getSizeOnDisk(); + } + catch(DatabaseException e) + { + throw _environmentFacade.handleDatabaseException("Cannot activate message store", e); + } + + if(messageRecoveryHandler != null) + { + recoverMessages(messageRecoveryHandler); + } + if(transactionLogRecoveryHandler != null) + { + recoverQueueEntries(transactionLogRecoveryHandler); + } + + _messageStoreStateManager.attainState(State.ACTIVE); + } + + @Override + public org.apache.qpid.server.store.Transaction newTransaction() throws StoreException + { + return new BDBTransaction(); } @Override @@ -283,35 +272,59 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore * @throws Exception If the close fails. */ @Override - public void close() throws StoreException + public void closeMessageStore() throws StoreException { - if (_closed.compareAndSet(false, true)) + _messageStoreStateManager.attainState(State.CLOSING); + try { - _stateManager.attainState(State.CLOSING); - try + if (_committer != null) { - try - { - _committer.stop(); - } - finally - { - closeEnvironment(); - } + _committer.stop(); } - catch(DatabaseException e) + } + finally + { + if (_configurationStoreStateManager.isInState(State.CLOSED) || _configurationStoreStateManager.isInState(State.INITIAL)) { - throw new StoreException("Exception occured on message store close", e); + closeEnvironment(); } - _stateManager.attainState(State.CLOSED); } + _messageStoreStateManager.attainState(State.CLOSED); + } + + @Override + public void closeConfigurationStore() throws StoreException + { + _configurationStoreStateManager.attainState(State.CLOSING); + try + { + if (_committer != null) + { + _committer.stop(); + } + } + finally + { + if (_messageStoreStateManager.isInState(State.CLOSED) || _messageStoreStateManager.isInState(State.INITIAL)) + { + closeEnvironment(); + } + } + _configurationStoreStateManager.attainState(State.CLOSED); } private void closeEnvironment() { if (_environmentFacade != null) { - _environmentFacade.close(); + try + { + _environmentFacade.close(); + } + catch(DatabaseException e) + { + throw new StoreException("Exception occured on message store close", e); + } } } @@ -704,7 +717,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore @Override public void create(UUID id, String type, Map attributes) throws StoreException { - if (_stateManager.isInState(State.ACTIVE)) + if (_configurationStoreStateManager.isInState(State.ACTIVE)) { ConfiguredObjectRecord configuredObject = new ConfiguredObjectRecord(id, type, attributes); storeConfiguredObjectEntry(configuredObject); @@ -774,7 +787,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore { if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Updating " +type + ", id: " + id); + LOGGER.debug("Updating " + type + ", id: " + id); } try @@ -1286,7 +1299,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore */ private void storeConfiguredObjectEntry(ConfiguredObjectRecord configuredObject) throws StoreException { - if (_stateManager.isInState(State.ACTIVE)) + if (_configurationStoreStateManager.isInState(State.ACTIVE)) { LOGGER.debug("Storing configured object: " + configuredObject); DatabaseEntry key = new DatabaseEntry(); diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeFactory.java index d242790efb..fd064d9b0e 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeFactory.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeFactory.java @@ -20,13 +20,13 @@ */ package org.apache.qpid.server.store.berkeleydb; -import org.apache.qpid.server.model.VirtualHost; +import java.util.Map; public interface EnvironmentFacadeFactory { public static final String ENVIRONMENT_CONFIGURATION = "bdbEnvironmentConfig"; - EnvironmentFacade createEnvironmentFacade(VirtualHost virtualHost, boolean isMessageStore); + EnvironmentFacade createEnvironmentFacade(String virtualHostName, Map storeSettings); String getType(); diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java index 75e14a70c7..cc38b799a6 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java @@ -25,8 +25,6 @@ import java.util.HashMap; import java.util.Map; import org.apache.qpid.server.configuration.BrokerProperties; -import org.apache.qpid.server.model.VirtualHost; -import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.MessageStore; public class StandardEnvironmentFacadeFactory implements EnvironmentFacadeFactory @@ -34,38 +32,19 @@ public class StandardEnvironmentFacadeFactory implements EnvironmentFacadeFactor @SuppressWarnings("unchecked") @Override - public EnvironmentFacade createEnvironmentFacade(VirtualHost virtualHost, boolean isMessageStore) + public EnvironmentFacade createEnvironmentFacade(String virtualHostName, Map messageStoreSettings) { - String name = virtualHost.getName(); - Map messageStoreSettings = virtualHost.getMessageStoreSettings(); Map envConfigMap = new HashMap(); envConfigMap.putAll(EnvironmentFacade.ENVCONFIG_DEFAULTS); - final String defaultPath = System.getProperty(BrokerProperties.PROPERTY_QPID_WORK) + File.separator + "bdbstore" + File.separator + name; + final String defaultPath = System.getProperty(BrokerProperties.PROPERTY_QPID_WORK) + File.separator + "bdbstore" + File.separator + virtualHostName; - String storeLocation; - if(isMessageStore) + Object environmentConfigurationAttributes = messageStoreSettings.get(ENVIRONMENT_CONFIGURATION); + if (environmentConfigurationAttributes instanceof Map) { - Object environmentConfigurationAttributes = messageStoreSettings.get(ENVIRONMENT_CONFIGURATION); - if (environmentConfigurationAttributes instanceof Map) - { - envConfigMap.putAll((Map) environmentConfigurationAttributes); - } - - storeLocation = (String) messageStoreSettings.get(MessageStore.STORE_PATH); - } - else // we are acting only as the durable config store - { - Map configurationStoreSettings = virtualHost.getConfigurationStoreSettings(); - - Object environmentConfigurationAttributes = configurationStoreSettings.get(ENVIRONMENT_CONFIGURATION); - if (environmentConfigurationAttributes instanceof Map) - { - envConfigMap.putAll((Map) environmentConfigurationAttributes); - } - - storeLocation = (String) configurationStoreSettings.get(DurableConfigurationStore.STORE_PATH); + envConfigMap.putAll((Map) environmentConfigurationAttributes); } + String storeLocation = (String) messageStoreSettings.get(MessageStore.STORE_PATH); if(storeLocation == null) { storeLocation = defaultPath; diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java index 4df62b1d0f..c6b3e48cf8 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java @@ -22,7 +22,6 @@ package org.apache.qpid.server.store.berkeleydb.replication; import java.util.Map; -import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.berkeleydb.EnvironmentFacade; import org.apache.qpid.server.store.berkeleydb.EnvironmentFacadeFactory; @@ -48,9 +47,8 @@ public class ReplicatedEnvironmentFacadeFactory implements EnvironmentFacadeFact private static final boolean DEFAULT_COALESCING_SYNC = true; @Override - public EnvironmentFacade createEnvironmentFacade(VirtualHost virtualHost, boolean isMessageStore) + public EnvironmentFacade createEnvironmentFacade(String virtualHostName, final Map messageStoreSettings) { - final Map messageStoreSettings = virtualHost.getMessageStoreSettings(); ReplicatedEnvironmentConfiguration configuration = new ReplicatedEnvironmentConfiguration() { @Override diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreConfigurationTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreConfigurationTest.java index bfe41773eb..e1678e6f65 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreConfigurationTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreConfigurationTest.java @@ -25,49 +25,9 @@ import org.apache.qpid.server.store.DurableConfigurationStore; public class BDBMessageStoreConfigurationTest extends AbstractDurableConfigurationStoreTestCase { - private BDBMessageStore _bdbMessageStore; - - @Override - protected BDBMessageStore createMessageStore() throws Exception - { - createStoreIfNecessary(); - return _bdbMessageStore; - } - - @Override - protected void closeMessageStore() throws Exception - { - closeStoreIfNecessary(); - } - @Override protected DurableConfigurationStore createConfigStore() throws Exception { - createStoreIfNecessary(); - - return _bdbMessageStore; - } - - @Override - protected void closeConfigStore() throws Exception - { - closeStoreIfNecessary(); - } - - private void createStoreIfNecessary() - { - if(_bdbMessageStore == null) - { - _bdbMessageStore = new BDBMessageStore(); - } - } - - private void closeStoreIfNecessary() throws Exception - { - if (_bdbMessageStore != null) - { - _bdbMessageStore.close(); - _bdbMessageStore = null; - } + return new BDBMessageStore(); } } diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreQuotaEventsTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreQuotaEventsTest.java index 65830fd1c2..f2de01445d 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreQuotaEventsTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreQuotaEventsTest.java @@ -20,15 +20,11 @@ */ package org.apache.qpid.server.store.berkeleydb; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - import java.util.Collections; import java.util.HashMap; import java.util.Map; import org.apache.log4j.Logger; -import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.MessageStoreQuotaEventsTestBase; @@ -62,20 +58,17 @@ public class BDBMessageStoreQuotaEventsTest extends MessageStoreQuotaEventsTestB @Override - protected VirtualHost createVirtualHost(String storeLocation) + protected MapcreateStoreSettings(String storeLocation) { _logger.debug("Applying store specific config. overfull-size=" + OVERFULL_SIZE + ", underfull-size=" + UNDERFULL_SIZE); - VirtualHost vhost = mock(VirtualHost.class); Map messageStoreSettings = new HashMap(); messageStoreSettings.put(MessageStore.STORE_PATH, storeLocation); messageStoreSettings.put(MessageStore.OVERFULL_SIZE, OVERFULL_SIZE); messageStoreSettings.put(MessageStore.UNDERFULL_SIZE, UNDERFULL_SIZE); Map envMap = Collections.singletonMap("je.log.fileMax", MAX_BDB_LOG_SIZE); messageStoreSettings.put(EnvironmentFacadeFactory.ENVIRONMENT_CONFIGURATION, envMap); - when(vhost.getMessageStoreSettings()).thenReturn(messageStoreSettings); - when(vhost.getName()).thenReturn("test"); - return vhost; + return messageStoreSettings; } @Override diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java index 835deb4a4c..c8fcfe0826 100644 --- a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java +++ b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java @@ -40,6 +40,7 @@ import org.apache.qpid.server.message.EnqueueableMessage; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.UUIDGenerator; +import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.protocol.v0_10.MessageMetaDataType_0_10; import org.apache.qpid.server.protocol.v0_10.MessageMetaData_0_10; import org.apache.qpid.server.protocol.v0_8.MessageMetaData; @@ -198,7 +199,7 @@ public class BDBMessageStoreTest extends MessageStoreTest String returnedPayloadString_0_10 = new String(recoveredContent.array()); assertEquals("Message Payload has changed", bodyText, returnedPayloadString_0_10); - readOnlyStore.close(); + readOnlyStore.closeMessageStore(); } private DeliveryProperties createDeliveryProperties_0_10() @@ -233,15 +234,16 @@ public class BDBMessageStoreTest extends MessageStoreTest */ private BDBMessageStore reloadStore(BDBMessageStore messageStore) throws Exception { - messageStore.close(); + messageStore.closeMessageStore(); BDBMessageStore newStore = new BDBMessageStore(); MessageStoreRecoveryHandler recoveryHandler = mock(MessageStoreRecoveryHandler.class); when(recoveryHandler.begin()).thenReturn(mock(StoredMessageRecoveryHandler.class)); - newStore.configureMessageStore(getVirtualHostModel(), recoveryHandler, null); + VirtualHost virtualHost = getVirtualHostModel(); + newStore.openMessageStore(virtualHost.getName(), virtualHost.getMessageStoreSettings()); - newStore.activate(); + newStore.recoverMessageStore(recoveryHandler, null); return newStore; } @@ -521,7 +523,7 @@ public class BDBMessageStoreTest extends MessageStoreTest File location = new File(storeLocation); assertTrue("Store does not exist at " + storeLocation, location.exists()); - bdbStore.close(); + bdbStore.closeMessageStore(); assertTrue("Store does not exist at " + storeLocation, location.exists()); bdbStore.onDelete(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java index f4072277d5..6e8932d3ba 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java @@ -57,7 +57,7 @@ public interface VirtualHost> extends ConfiguredObject< String CONFIGURATION_STORE_SETTINGS = "configurationStoreSettings"; String MESSAGE_STORE_SETTINGS = "messageStoreSettings"; - int CURRENT_CONFIG_VERSION = 4; + int CURRENT_CONFIG_VERSION = 5; @ManagedAttribute Collection getSupportedExchangeTypes(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java index 81c74fe8f5..a6424a3d28 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java @@ -37,15 +37,16 @@ import java.sql.Types; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.UUID; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; + import org.apache.log4j.Logger; import org.apache.qpid.server.message.EnqueueableMessage; import org.apache.qpid.server.model.ConfiguredObject; -import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.plugin.MessageMetaDataType; import org.apache.qpid.server.queue.AMQQueue; import org.codehaus.jackson.JsonGenerationException; @@ -79,13 +80,13 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC private static final String CONFIGURED_OBJECTS_TABLE_NAME = "QPID_CONFIGURED_OBJECTS"; private static final int DEFAULT_CONFIG_VERSION = 0; - public static String[] ALL_TABLES = new String[] { DB_VERSION_TABLE_NAME, LINKS_TABLE_NAME, BRIDGES_TABLE_NAME, XID_ACTIONS_TABLE_NAME, - XID_TABLE_NAME, QUEUE_ENTRY_TABLE_NAME, MESSAGE_CONTENT_TABLE_NAME, META_DATA_TABLE_NAME, CONFIGURED_OBJECTS_TABLE_NAME, CONFIGURATION_VERSION_TABLE_NAME }; + public static final Set CONFIGURATION_STORE_TABLE_NAMES = new HashSet(Arrays.asList(CONFIGURED_OBJECTS_TABLE_NAME, CONFIGURATION_VERSION_TABLE_NAME)); + public static final Set MESSAGE_STORE_TABLE_NAMES = new HashSet(Arrays.asList(DB_VERSION_TABLE_NAME, META_DATA_TABLE_NAME, MESSAGE_CONTENT_TABLE_NAME, QUEUE_ENTRY_TABLE_NAME, BRIDGES_TABLE_NAME, LINKS_TABLE_NAME, XID_TABLE_NAME, XID_ACTIONS_TABLE_NAME)); + private static final int DB_VERSION = 7; private final AtomicLong _messageId = new AtomicLong(0); - private final AtomicBoolean _closed = new AtomicBoolean(false); private static final String CREATE_DB_VERSION_TABLE = "CREATE TABLE "+ DB_VERSION_TABLE_NAME + " ( version int not null )"; private static final String INSERT_INTO_DB_VERSION = "INSERT INTO "+ DB_VERSION_TABLE_NAME + " ( version ) VALUES ( ? )"; @@ -192,69 +193,90 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC protected final EventManager _eventManager = new EventManager(); - protected final StateManager _stateManager; + protected final StateManager _messageStoreStateManager; - private MessageStoreRecoveryHandler _messageRecoveryHandler; - private TransactionLogRecoveryHandler _tlogRecoveryHandler; - private ConfigurationRecoveryHandler _configRecoveryHandler; - private VirtualHost _virtualHost; + private StateManager _configurationStoreStateManager; + private boolean _initialized; public AbstractJDBCMessageStore() { - _stateManager = new StateManager(_eventManager); + _messageStoreStateManager = new StateManager(_eventManager); + _configurationStoreStateManager = new StateManager(new EventManager()); } @Override - public void configureConfigStore(VirtualHost virtualHost, ConfigurationRecoveryHandler configRecoveryHandler) + public void openConfigurationStore(String virtualHostName, Map storeSettings) { - _stateManager.attainState(State.INITIALISING); - _configRecoveryHandler = configRecoveryHandler; - _virtualHost = virtualHost; - + _configurationStoreStateManager.attainState(State.INITIALISING); + initialiseIfNecessary(virtualHostName, storeSettings); + _configurationStoreStateManager.attainState(State.INITIALISED); } - @Override - public void configureMessageStore(VirtualHost virtualHost, MessageStoreRecoveryHandler recoveryHandler, - TransactionLogRecoveryHandler tlogRecoveryHandler) + private void initialiseIfNecessary(String virtualHostName, Map storeSettings) { - if(_stateManager.isInState(State.INITIAL)) + if (!_initialized) { - _stateManager.attainState(State.INITIALISING); + try + { + implementationSpecificConfiguration(virtualHostName, storeSettings); + } + catch (ClassNotFoundException e) + { + throw new StoreException("Cannot find driver class", e); + } + catch (SQLException e) + { + throw new StoreException("Unexpected exception occured", e); + } + _initialized =true; } + } - _virtualHost = virtualHost; - _tlogRecoveryHandler = tlogRecoveryHandler; - _messageRecoveryHandler = recoveryHandler; + @Override + public void recoverConfigurationStore(ConfigurationRecoveryHandler recoveryHandler) + { + _configurationStoreStateManager.attainState(State.ACTIVATING); - completeInitialisation(); + try + { + createOrOpenConfigurationStoreDatabase(); + recoveryHandler.beginConfigurationRecovery(this, getConfigVersion()); + loadConfiguredObjects(recoveryHandler); + setConfigVersion(recoveryHandler.completeConfigurationRecovery()); + } + catch (SQLException e) + { + throw new StoreException("Error recovering persistent state: " + e.getMessage(), e); + } + _configurationStoreStateManager.attainState(State.ACTIVE); } - private void completeInitialisation() + @Override + public void openMessageStore(String virtualHostName, Map messageStoreSettings) { - commonConfiguration(); - - _stateManager.attainState(State.INITIALISED); + _messageStoreStateManager.attainState(State.INITIALISING); + initialiseIfNecessary(virtualHostName, messageStoreSettings); + _messageStoreStateManager.attainState(State.INITIALISED); } @Override - public void activate() + public void recoverMessageStore(MessageStoreRecoveryHandler messageRecoveryHandler, TransactionLogRecoveryHandler transactionLogRecoveryHandler) { - if(_stateManager.isInState(State.INITIALISING)) + _messageStoreStateManager.attainState(State.ACTIVATING); + try { - completeInitialisation(); + createOrOpenMessageStoreDatabase(); + upgradeIfNecessary(); } - _stateManager.attainState(State.ACTIVATING); - - // this recovers durable exchanges, queues, and bindings - if(_configRecoveryHandler != null) + catch (SQLException e) { - recoverConfiguration(_configRecoveryHandler); + throw new StoreException("Unable to activate message store ", e); } - if(_messageRecoveryHandler != null) + if(messageRecoveryHandler != null) { try { - recoverMessages(_messageRecoveryHandler); + recoverMessages(messageRecoveryHandler); } catch (SQLException e) { @@ -262,11 +284,11 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC "persistent store ", e); } } - if(_tlogRecoveryHandler != null) + if(transactionLogRecoveryHandler != null) { try { - TransactionLogRecoveryHandler.DtxRecordRecoveryHandler dtxrh = recoverQueueEntries(_tlogRecoveryHandler); + TransactionLogRecoveryHandler.DtxRecordRecoveryHandler dtxrh = recoverQueueEntries(transactionLogRecoveryHandler); recoverXids(dtxrh); } catch (SQLException e) @@ -277,25 +299,7 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC } - _stateManager.attainState(State.ACTIVE); - } - - private void commonConfiguration() - { - try - { - implementationSpecificConfiguration(_virtualHost.getName(), _virtualHost); - createOrOpenDatabase(); - upgradeIfNecessary(); - } - catch (ClassNotFoundException e) - { - throw new StoreException("Unable to configure message store ", e); - } - catch (SQLException e) - { - throw new StoreException("Unable to configure message store ", e); - } + _messageStoreStateManager.attainState(State.ACTIVE); } protected void upgradeIfNecessary() throws SQLException @@ -370,8 +374,7 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC } } - protected abstract void implementationSpecificConfiguration(String name, - VirtualHost virtualHost) throws ClassNotFoundException, SQLException; + protected abstract void implementationSpecificConfiguration(String name, Map messageStoreSettings) throws ClassNotFoundException, SQLException; abstract protected Logger getLogger(); @@ -381,13 +384,11 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC abstract protected String getSqlBigIntType(); - protected void createOrOpenDatabase() throws SQLException + protected void createOrOpenMessageStoreDatabase() throws SQLException { Connection conn = newAutoCommitConnection(); createVersionTable(conn); - createConfigVersionTable(conn); - createConfiguredObjectsTable(conn); createQueueEntryTable(conn); createMetaDataTable(conn); createMessageContentTable(conn); @@ -398,6 +399,16 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC conn.close(); } + protected void createOrOpenConfigurationStoreDatabase() throws SQLException + { + Connection conn = newAutoCommitConnection(); + + createConfigVersionTable(conn); + createConfiguredObjectsTable(conn); + + conn.close(); + } + private void createVersionTable(final Connection conn) throws SQLException { if(!tableExists(DB_VERSION_TABLE_NAME, conn)) @@ -645,21 +656,6 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC } } - protected void recoverConfiguration(ConfigurationRecoveryHandler recoveryHandler) - { - try - { - recoveryHandler.beginConfigurationRecovery(this, getConfigVersion()); - loadConfiguredObjects(recoveryHandler); - - setConfigVersion(recoveryHandler.completeConfigurationRecovery()); - } - catch (SQLException e) - { - throw new StoreException("Error recovering persistent state: " + e.getMessage(), e); - } - } - private void setConfigVersion(int version) throws SQLException { Connection conn = newAutoCommitConnection(); @@ -722,16 +718,29 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC } @Override - public void close() + public void closeMessageStore() { - if (_closed.compareAndSet(false, true)) - { - _stateManager.attainState(State.CLOSING); + _messageStoreStateManager.attainState(State.CLOSING); + if (_configurationStoreStateManager.isInState(State.CLOSED) || _configurationStoreStateManager.isInState(State.INITIAL)) + { doClose(); + } + + _messageStoreStateManager.attainState(State.CLOSED); + } - _stateManager.attainState(State.CLOSED); + @Override + public void closeConfigurationStore() + { + _configurationStoreStateManager.attainState(State.CLOSING); + + if (_messageStoreStateManager.isInState(State.CLOSED) || _messageStoreStateManager.isInState(State.INITIAL)) + { + doClose(); } + + _configurationStoreStateManager.attainState(State.CLOSED); } @@ -819,7 +828,7 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC @Override public void create(UUID id, String type, Map attributes) throws StoreException { - if (_stateManager.isInState(State.ACTIVE)) + if (_configurationStoreStateManager.isInState(State.ACTIVE)) { insertConfiguredObject(new ConfiguredObjectRecord(id, type, attributes)); } @@ -839,7 +848,7 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC @Override public void update(UUID id, String type, Map attributes) throws StoreException { - if (_stateManager.isInState(State.ACTIVE)) + if (_configurationStoreStateManager.isInState(State.ACTIVE)) { ConfiguredObjectRecord queueConfiguredObject = loadConfiguredObject(id); if (queueConfiguredObject != null) @@ -1153,11 +1162,6 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC } - protected boolean isConfigStoreOnly() - { - return _messageRecoveryHandler == null; - } - private static final class ConnectionWrapper { private final Connection _connection; @@ -1995,7 +1999,7 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC private void insertConfiguredObject(ConfiguredObjectRecord configuredObject) throws StoreException { - if (_stateManager.isInState(State.ACTIVE)) + if (_configurationStoreStateManager.isInState(State.ACTIVE)) { try { @@ -2141,7 +2145,7 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC private void updateConfiguredObject(final ConfiguredObjectRecord configuredObject) throws StoreException { - if (_stateManager.isInState(State.ACTIVE)) + if (_configurationStoreStateManager.isInState(State.ACTIVE)) { try { @@ -2164,7 +2168,7 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC public void update(boolean createIfNecessary, ConfiguredObjectRecord... records) throws StoreException { - if (_stateManager.isInState(State.ACTIVE) || _stateManager.isInState(State.ACTIVATING)) + if (_configurationStoreStateManager.isInState(State.ACTIVE) || _configurationStoreStateManager.isInState(State.ACTIVATING)) { try { @@ -2405,13 +2409,21 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC Connection conn = newAutoCommitConnection(); try { - for (String tableName : ALL_TABLES) + List tables = new ArrayList(); + tables.addAll(CONFIGURATION_STORE_TABLE_NAMES); + tables.addAll(MESSAGE_STORE_TABLE_NAMES); + + for (String tableName : tables) { Statement stmt = conn.createStatement(); try { stmt.execute("DROP TABLE " + tableName); } + catch(SQLException e) + { + getLogger().warn("Failed to drop table '" + tableName + "' :" + e); + } finally { stmt.close(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractMemoryMessageStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractMemoryMessageStore.java index d72bd1a5bf..afdeb257e0 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractMemoryMessageStore.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractMemoryMessageStore.java @@ -20,17 +20,15 @@ */ package org.apache.qpid.server.store; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.Map; import java.util.concurrent.atomic.AtomicLong; import org.apache.qpid.server.message.EnqueueableMessage; -import org.apache.qpid.server.model.VirtualHost; /** A simple message store that stores the messages in a thread-safe structure in memory. */ abstract public class AbstractMemoryMessageStore extends NullMessageStore { private final AtomicLong _messageId = new AtomicLong(1); - private final AtomicBoolean _closed = new AtomicBoolean(false); private static final Transaction IN_MEMORY_TRANSACTION = new Transaction() { @@ -80,30 +78,26 @@ abstract public class AbstractMemoryMessageStore extends NullMessageStore } @Override - public void configureConfigStore(VirtualHost virtualHost, ConfigurationRecoveryHandler recoveryHandler) + public void openConfigurationStore(String virtualHostName, Map storeSettings) { - _stateManager.attainState(State.INITIALISING); } @Override - public void configureMessageStore(VirtualHost virtualHost, MessageStoreRecoveryHandler recoveryHandler, - TransactionLogRecoveryHandler tlogRecoveryHandler) + public void recoverConfigurationStore(ConfigurationRecoveryHandler recoveryHandler) { - if(_stateManager.isInState(State.INITIAL)) - { - _stateManager.attainState(State.INITIALISING); - } - _stateManager.attainState(State.INITIALISED); + } @Override - public void activate() + public void openMessageStore(String virtualHostName, Map messageStoreSettings) { + _stateManager.attainState(State.INITIALISING); + _stateManager.attainState(State.INITIALISED); + } - if(_stateManager.isInState(State.INITIALISING)) - { - _stateManager.attainState(State.INITIALISED); - } + @Override + public void recoverMessageStore(MessageStoreRecoveryHandler messageRecoveryHandler, TransactionLogRecoveryHandler transactionLogRecoveryHandler) + { _stateManager.attainState(State.ACTIVATING); _stateManager.attainState(State.ACTIVE); @@ -131,13 +125,10 @@ abstract public class AbstractMemoryMessageStore extends NullMessageStore } @Override - public void close() + public void closeMessageStore() { - if (_closed.compareAndSet(false, true)) - { - _stateManager.attainState(State.CLOSING); - _stateManager.attainState(State.CLOSED); - } + _stateManager.attainState(State.CLOSING); + _stateManager.attainState(State.CLOSED); } @Override diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java index 5921abc324..10e56f1f71 100755 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java @@ -23,8 +23,6 @@ package org.apache.qpid.server.store; import java.util.Map; import java.util.UUID; -import org.apache.qpid.server.model.VirtualHost; - public interface DurableConfigurationStore { String STORE_TYPE = "storeType"; @@ -38,16 +36,17 @@ public interface DurableConfigurationStore /** * Called after instantiation in order to configure the message store. A particular implementation can define * whatever parameters it wants. - * - * - * - * - * - * @param virtualHost - * @param recoveryHandler Handler to be called as the store recovers on start up + * @param virtualHostName host name + * @param storeSettings store settings */ - void configureConfigStore(VirtualHost virtualHost, ConfigurationRecoveryHandler recoveryHandler); + void openConfigurationStore(String virtualHostName, Map storeSettings) throws StoreException; + /** + * Recovers configuration from the store using given recovery handler + * + * @param recoveryHandler recovery handler + */ + void recoverConfigurationStore(ConfigurationRecoveryHandler recoveryHandler) throws StoreException; /** * Makes the specified object persistent. @@ -86,8 +85,8 @@ public interface DurableConfigurationStore void update(UUID id, String type, Map attributes) throws StoreException; - public void update(boolean createIfNecessary, ConfiguredObjectRecord... records) throws StoreException; + void update(boolean createIfNecessary, ConfiguredObjectRecord... records) throws StoreException; - void close() throws Exception; + void closeConfigurationStore() throws StoreException; } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java index 2f869201cf..a1512bbc22 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java @@ -67,12 +67,17 @@ public class JsonFileConfigStore implements DurableConfigurationStore } @Override - public void configureConfigStore(final VirtualHost virtualHost, final ConfigurationRecoveryHandler recoveryHandler) + public void openConfigurationStore(String virtualHostName, Map storeSettings) { - _name = virtualHost.getName(); + _name = virtualHostName; - setup(virtualHost); + setup(storeSettings); load(); + } + + @Override + public void recoverConfigurationStore(ConfigurationRecoveryHandler recoveryHandler) + { recoveryHandler.beginConfigurationRecovery(this,_configVersion); List records = new ArrayList(_objectsById.values()); for(ConfiguredObjectRecord record : records) @@ -87,11 +92,8 @@ public class JsonFileConfigStore implements DurableConfigurationStore } } - private void setup(final VirtualHost virtualHost) + private void setup(final Map configurationStoreSettings) { - @SuppressWarnings("unchecked") - Map configurationStoreSettings = virtualHost.getConfigurationStoreSettings(); - Object storePathAttr = configurationStoreSettings.get(DurableConfigurationStore.STORE_PATH); if(!(storePathAttr instanceof String)) { @@ -510,12 +512,17 @@ public class JsonFileConfigStore implements DurableConfigurationStore save(); } - public void close() throws Exception + @Override + public void closeConfigurationStore() { try { releaseFileLock(); } + catch (IOException e) + { + throw new StoreException("Failed to release lock", e); + } finally { _fileLock = null; diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java index b3a6216c84..fb99191477 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java @@ -20,7 +20,7 @@ */ package org.apache.qpid.server.store; -import org.apache.qpid.server.model.VirtualHost; +import java.util.Map; /** * MessageStore defines the interface to a storage area, which can be used to preserve the state of messages. @@ -34,21 +34,20 @@ public interface MessageStore String OVERFULL_SIZE = "storeOverfullSize"; /** - * Called after instantiation in order to configure the message store. A particular implementation can define + * Called after instantiation in order to open and initialize the message store. A particular implementation can define * whatever parameters it wants. - * - * - * - * - * @param virtualHost - * @param messageRecoveryHandler Handler to be called as the store recovers on start up - * @param tlogRecoveryHandler - * @throws Exception If any error occurs that means the store is unable to configure itself. + * @param virtualHostName virtual host name + * @param messageStoreSettings store settings */ - void configureMessageStore(VirtualHost virtualHost, MessageStoreRecoveryHandler messageRecoveryHandler, - TransactionLogRecoveryHandler tlogRecoveryHandler); + void openMessageStore(String virtualHostName, Map messageStoreSettings); - void activate(); + /** + * Called after opening to recover messages and transactions with given recovery handlers + * + * @param messageRecoveryHandler + * @param transactionLogRecoveryHandler + */ + void recoverMessageStore(MessageStoreRecoveryHandler messageRecoveryHandler, TransactionLogRecoveryHandler transactionLogRecoveryHandler); public StoredMessage addMessage(T metaData); @@ -64,10 +63,8 @@ public interface MessageStore /** * Called to close and cleanup any resources used by the message store. - * - * @throws Exception If the close fails. */ - void close(); + void closeMessageStore(); void addEventListener(EventListener eventListener, Event... events); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java index c579a27731..c095675602 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java @@ -22,12 +22,15 @@ package org.apache.qpid.server.store; import java.util.Map; import java.util.UUID; -import org.apache.qpid.server.model.VirtualHost; - public abstract class NullMessageStore implements MessageStore, DurableConfigurationStore { @Override - public void configureConfigStore(VirtualHost virtualHost, ConfigurationRecoveryHandler recoveryHandler) + public void openConfigurationStore(String virtualHostName, Map storeSettings) + { + } + + @Override + public void recoverConfigurationStore(ConfigurationRecoveryHandler recoveryHandler) { } @@ -59,13 +62,17 @@ public abstract class NullMessageStore implements MessageStore, DurableConfigura } @Override - public void configureMessageStore(VirtualHost virtualHost, MessageStoreRecoveryHandler recoveryHandler, - TransactionLogRecoveryHandler tlogRecoveryHandler) + public void openMessageStore(String virtualHostName, Map messageStoreSettings) + { + } + + @Override + public void closeMessageStore() { } @Override - public void close() + public void closeConfigurationStore() { } @@ -88,7 +95,7 @@ public abstract class NullMessageStore implements MessageStore, DurableConfigura } @Override - public void activate() + public void recoverMessageStore(MessageStoreRecoveryHandler messageRecoveryHandler, TransactionLogRecoveryHandler transactionLogRecoveryHandler) { } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java index 3ffa34f4fa..a14b1ad8c1 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java @@ -68,6 +68,7 @@ import org.apache.qpid.server.store.DurableConfigurationStoreHelper; import org.apache.qpid.server.store.DurableConfiguredObjectRecoverer; import org.apache.qpid.server.store.Event; import org.apache.qpid.server.store.EventListener; +import org.apache.qpid.server.store.StoreException; import org.apache.qpid.server.txn.DtxRegistry; import org.apache.qpid.server.util.MapValueConverter; @@ -572,10 +573,10 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg //Close MessageStore if (getMessageStore() != null) { - //Remove MessageStore Interface should not throw Exception + // TODO Remove MessageStore Interface should not throw Exception try { - getMessageStore().close(); + getMessageStore().closeMessageStore(); } catch (Exception e) { @@ -584,14 +585,13 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg } if (getDurableConfigurationStore() != null) { - //Remove MessageStore Interface should not throw Exception try { - getDurableConfigurationStore().close(); + getDurableConfigurationStore().closeConfigurationStore(); } - catch (Exception e) + catch (StoreException e) { - _logger.error("Failed to close message store", e); + _logger.error("Failed to close configuration store", e); } } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java index efdca3b67d..2d824cbd2d 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java @@ -20,16 +20,23 @@ */ package org.apache.qpid.server.virtualhost; +import static org.apache.qpid.server.model.VirtualHost.CURRENT_CONFIG_VERSION; + +import java.util.Collections; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.Map; +import java.util.Map.Entry; import java.util.UUID; + +import org.apache.log4j.Logger; import org.apache.qpid.server.exchange.ExchangeRegistry; -import org.apache.qpid.server.filter.FilterSupport; import org.apache.qpid.server.exchange.TopicExchange; +import org.apache.qpid.server.filter.FilterSupport; import org.apache.qpid.server.model.Binding; import org.apache.qpid.server.model.Exchange; import org.apache.qpid.server.model.Queue; +import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.queue.QueueArgumentsConverter; import org.apache.qpid.server.store.ConfiguredObjectRecord; import org.apache.qpid.server.store.DurableConfigurationRecoverer; @@ -38,19 +45,38 @@ import org.apache.qpid.server.store.NonNullUpgrader; import org.apache.qpid.server.store.NullUpgrader; import org.apache.qpid.server.store.UpgraderProvider; -import static org.apache.qpid.server.model.VirtualHost.CURRENT_CONFIG_VERSION; - public class DefaultUpgraderProvider implements UpgraderProvider { + private static final Logger LOGGER = Logger.getLogger(DefaultUpgraderProvider.class); + public static final String EXCLUSIVE = "exclusive"; + public static final String NAME = "name"; private final ExchangeRegistry _exchangeRegistry; private final VirtualHost _virtualHost; + @SuppressWarnings("serial") + private static final Map DEFAULT_EXCHANGES = Collections.unmodifiableMap(new HashMap() + {{ + put("amq.direct", "direct"); + put("amq.topic", "topic"); + put("amq.fanout", "fanout"); + put("amq.match", "headers"); + }}); + + private final Map _defaultExchangeIds; + public DefaultUpgraderProvider(final VirtualHost virtualHost, final ExchangeRegistry exchangeRegistry) { _virtualHost = virtualHost; _exchangeRegistry = exchangeRegistry; + Map defaultExchangeIds = new HashMap(); + for (String exchangeName : DEFAULT_EXCHANGES.keySet()) + { + UUID id = UUIDGenerator.generateExchangeUUID(exchangeName, _virtualHost.getName()); + defaultExchangeIds.put(exchangeName, id); + } + _defaultExchangeIds = Collections.unmodifiableMap(defaultExchangeIds); } public DurableConfigurationStoreUpgrader getUpgrader(final int configVersion, DurableConfigurationRecoverer recoverer) @@ -66,6 +92,8 @@ public class DefaultUpgraderProvider implements UpgraderProvider currentUpgrader = addUpgrader(currentUpgrader, new Version2Upgrader()); case 3: currentUpgrader = addUpgrader(currentUpgrader, new Version3Upgrader()); + case 4: + currentUpgrader = addUpgrader(currentUpgrader, new Version4Upgrader()); case CURRENT_CONFIG_VERSION: currentUpgrader = addUpgrader(currentUpgrader, new NullUpgrader(recoverer)); break; @@ -131,6 +159,11 @@ public class DefaultUpgraderProvider implements UpgraderProvider } else { + if (_defaultExchangeIds.get("amq.topic").equals(exchangeId)) + { + return true; + } + return _exchangeRegistry.getExchange(exchangeId) != null && _exchangeRegistry.getExchange(exchangeId).getExchangeType() == TopicExchange.TYPE; } @@ -209,6 +242,10 @@ public class DefaultUpgraderProvider implements UpgraderProvider private boolean unknownExchange(final String exchangeIdString) { UUID exchangeId = UUID.fromString(exchangeIdString); + if (_defaultExchangeIds.containsValue(exchangeId)) + { + return false; + } ConfiguredObjectRecord localRecord = getUpdateMap().get(exchangeId); return !((localRecord != null && localRecord.getType().equals(Exchange.class.getSimpleName())) || _exchangeRegistry.getExchange(exchangeId) != null); @@ -310,4 +347,50 @@ public class DefaultUpgraderProvider implements UpgraderProvider } } + private class Version4Upgrader extends NonNullUpgrader + { + private Map _missingAmqpExchanges = new HashMap(DEFAULT_EXCHANGES); + + @Override + public void configuredObject(UUID id, String type, Map attributes) + { + if(Exchange.class.getSimpleName().equals(type)) + { + String name = (String)attributes.get(NAME); + _missingAmqpExchanges.remove(name); + } + + getNextUpgrader().configuredObject(id,type,attributes); + } + + @Override + public void complete() + { + for (Entry entry : _missingAmqpExchanges.entrySet()) + { + String name = entry.getKey(); + String type = entry.getValue(); + UUID id = _defaultExchangeIds.get(name); + + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Creating amqp exchange " + name + " with id " + id); + } + + Map attributes = new HashMap(); + attributes.put(org.apache.qpid.server.model.Exchange.NAME, name); + attributes.put(org.apache.qpid.server.model.Exchange.TYPE, type); + + attributes.put(org.apache.qpid.server.model.Exchange.DURABLE, true); + + getUpdateMap().put(id, new ConfiguredObjectRecord(id, Exchange.class.getSimpleName(), attributes)); + + getNextUpgrader().configuredObject(id, Exchange.class.getSimpleName(), attributes); + + } + + getNextUpgrader().complete(); + } + } + } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java index be6cc52981..58a0e689cb 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java @@ -92,14 +92,17 @@ public class StandardVirtualHost extends AbstractVirtualHost DurableConfigurationRecoverer configRecoverer = new DurableConfigurationRecoverer(getName(), getDurableConfigurationRecoverers(), new DefaultUpgraderProvider(this, getExchangeRegistry()), getEventLogger()); - _durableConfigurationStore.configureConfigStore(virtualHost, configRecoverer); + _durableConfigurationStore.openConfigurationStore(virtualHost.getName(), _durableConfigurationStore == _messageStore ? messageStoreSettings: configurationStoreSettings); - VirtualHostConfigRecoveryHandler recoveryHandler = new VirtualHostConfigRecoveryHandler(this); - _messageStore.configureMessageStore(virtualHost, recoveryHandler, recoveryHandler); + _messageStore.openMessageStore(virtualHost.getName(), virtualHost.getMessageStoreSettings()); + + _durableConfigurationStore.recoverConfigurationStore(configRecoverer); + // If store does not have entries for standard exchanges (amq.*), the following will create them. initialiseModel(); - _messageStore.activate(); + VirtualHostConfigRecoveryHandler recoveryHandler = new VirtualHostConfigRecoveryHandler(this); + _messageStore.recoverMessageStore(recoveryHandler, recoveryHandler); attainActivation(); } @@ -116,4 +119,5 @@ public class StandardVirtualHost extends AbstractVirtualHost return _durableConfigurationStore; } -} + +} \ No newline at end of file diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java index 76d460b296..b4dfbe837d 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java @@ -29,7 +29,6 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import static org.mockito.Mockito.times; import java.io.File; import java.util.HashMap; @@ -41,17 +40,13 @@ import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.server.binding.BindingImpl; import org.apache.qpid.server.exchange.ExchangeImpl; import org.apache.qpid.server.logging.EventLogger; -import org.apache.qpid.server.message.EnqueueableMessage; import org.apache.qpid.server.model.ExclusivityPolicy; import org.apache.qpid.server.model.LifetimePolicy; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.UUIDGenerator; -import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.plugin.ExchangeType; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.security.SecurityManager; -import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler; -import org.apache.qpid.server.store.Transaction.Record; import org.apache.qpid.test.utils.QpidTestCase; import org.apache.qpid.util.FileUtils; import org.mockito.ArgumentCaptor; @@ -69,15 +64,8 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest private String _storePath; private String _storeName; - private MessageStore _messageStore; - private VirtualHost _virtualHost; private ConfigurationRecoveryHandler _recoveryHandler; - private MessageStoreRecoveryHandler _messageStoreRecoveryHandler; - private StoredMessageRecoveryHandler _storedMessageRecoveryHandler; - private TransactionLogRecoveryHandler _logRecoveryHandler; - private TransactionLogRecoveryHandler.QueueEntryRecoveryHandler _queueEntryRecoveryHandler; - private TransactionLogRecoveryHandler.DtxRecordRecoveryHandler _dtxRecordRecoveryHandler; private ExchangeImpl _exchange = mock(ExchangeImpl.class); private static final String ROUTING_KEY = "routingKey"; @@ -86,40 +74,28 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest private UUID _queueId; private UUID _exchangeId; private DurableConfigurationStore _configStore; - protected Map _messageStoreSettings; + protected Map _configurationStoreSettings; public void setUp() throws Exception { super.setUp(); - _messageStoreSettings = new HashMap(); + _configurationStoreSettings = new HashMap(); _queueId = UUIDGenerator.generateRandomUUID(); _exchangeId = UUIDGenerator.generateRandomUUID(); _storeName = getName(); _storePath = TMP_FOLDER + File.separator + _storeName; - _messageStoreSettings.put(MessageStore.STORE_PATH, _storePath); + _configurationStoreSettings.put(MessageStore.STORE_PATH, _storePath); FileUtils.delete(new File(_storePath), true); setTestSystemProperty("QPID_WORK", TMP_FOLDER); _recoveryHandler = mock(ConfigurationRecoveryHandler.class); - _storedMessageRecoveryHandler = mock(StoredMessageRecoveryHandler.class); - _logRecoveryHandler = mock(TransactionLogRecoveryHandler.class); - _messageStoreRecoveryHandler = mock(MessageStoreRecoveryHandler.class); - _queueEntryRecoveryHandler = mock(TransactionLogRecoveryHandler.QueueEntryRecoveryHandler.class); - _dtxRecordRecoveryHandler = mock(TransactionLogRecoveryHandler.DtxRecordRecoveryHandler.class); - _virtualHost = mock(VirtualHost.class); - when(_virtualHost.getSecurityManager()).thenReturn(mock(org.apache.qpid.server.security.SecurityManager.class)); - - when(_messageStoreRecoveryHandler.begin()).thenReturn(_storedMessageRecoveryHandler); - when(_logRecoveryHandler.begin(any(MessageStore.class))).thenReturn(_queueEntryRecoveryHandler); - when(_queueEntryRecoveryHandler.completeQueueEntryRecovery()).thenReturn(_dtxRecordRecoveryHandler); - when(_exchange.getName()).thenReturn(EXCHANGE_NAME); + when(_exchange.getName()).thenReturn(EXCHANGE_NAME); when(_exchange.getId()).thenReturn(_exchangeId); when(_exchange.getExchangeType()).thenReturn(mock(ExchangeType.class)); when(_exchange.getEventLogger()).thenReturn(new EventLogger()); - when(_virtualHost.getMessageStoreSettings()).thenReturn(_messageStoreSettings); _bindingArgs = new HashMap(); String argKey = AMQPFilterTypes.JMS_SELECTOR.toString(); @@ -133,7 +109,6 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest { try { - closeMessageStore(); closeConfigStore(); FileUtils.delete(new File(_storePath), true); } @@ -446,123 +421,20 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest private void reopenStore() throws Exception { - closeMessageStore(); closeConfigStore(); - _messageStore = createMessageStore(); _configStore = createConfigStore(); - _configStore.configureConfigStore(_virtualHost, _recoveryHandler); - _messageStore.configureMessageStore(_virtualHost, _messageStoreRecoveryHandler, _logRecoveryHandler); - _messageStore.activate(); + _configStore.openConfigurationStore("testName", _configurationStoreSettings); + _configStore.recoverConfigurationStore(_recoveryHandler); } - protected abstract MessageStore createMessageStore() throws Exception; protected abstract DurableConfigurationStore createConfigStore() throws Exception; - protected abstract void closeMessageStore() throws Exception; - protected abstract void closeConfigStore() throws Exception; - public void testRecordXid() throws Exception + protected void closeConfigStore() throws Exception { - Record enqueueRecord = getTestRecord(1); - Record dequeueRecord = getTestRecord(2); - Record[] enqueues = { enqueueRecord }; - Record[] dequeues = { dequeueRecord }; - byte[] globalId = new byte[] { 1 }; - byte[] branchId = new byte[] { 2 }; - - Transaction transaction = _messageStore.newTransaction(); - transaction.recordXid(1l, globalId, branchId, enqueues, dequeues); - transaction.commitTran(); - reopenStore(); - verify(_dtxRecordRecoveryHandler).dtxRecord(1l, globalId, branchId, enqueues, dequeues); - - transaction = _messageStore.newTransaction(); - transaction.removeXid(1l, globalId, branchId); - transaction.commitTran(); - - reopenStore(); - verify(_dtxRecordRecoveryHandler, times(1)).dtxRecord(1l, globalId, branchId, enqueues, dequeues); - } - - private Record getTestRecord(long messageNumber) - { - UUID queueId1 = UUIDGenerator.generateRandomUUID(); - TransactionLogResource queue1 = mock(TransactionLogResource.class); - when(queue1.getId()).thenReturn(queueId1); - EnqueueableMessage message1 = mock(EnqueueableMessage.class); - when(message1.isPersistent()).thenReturn(true); - when(message1.getMessageNumber()).thenReturn(messageNumber); - final StoredMessage storedMessage = mock(StoredMessage.class); - when(storedMessage.getMessageNumber()).thenReturn(messageNumber); - when(message1.getStoredMessage()).thenReturn(storedMessage); - Record enqueueRecord = new TestRecord(queue1, message1); - return enqueueRecord; - } - - private static class TestRecord implements Record - { - private TransactionLogResource _queue; - private EnqueueableMessage _message; - - public TestRecord(TransactionLogResource queue, EnqueueableMessage message) - { - super(); - _queue = queue; - _message = message; - } - - @Override - public TransactionLogResource getResource() - { - return _queue; - } - - @Override - public EnqueueableMessage getMessage() + if (_configStore != null) { - return _message; + _configStore.closeConfigurationStore(); } - - @Override - public int hashCode() - { - final int prime = 31; - int result = 1; - result = prime * result + ((_message == null) ? 0 : new Long(_message.getMessageNumber()).hashCode()); - result = prime * result + ((_queue == null) ? 0 : _queue.getId().hashCode()); - return result; - } - - @Override - public boolean equals(Object obj) - { - if (this == obj) - { - return true; - } - if (obj == null) - { - return false; - } - if (!(obj instanceof Record)) - { - return false; - } - Record other = (Record) obj; - if (_message == null && other.getMessage() != null) - { - return false; - } - if (_queue == null && other.getResource() != null) - { - return false; - } - if (_message.getMessageNumber() != other.getMessage().getMessageNumber()) - { - return false; - } - return _queue.getId().equals(other.getResource().getId()); - } - } } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreConfigurationTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreConfigurationTest.java new file mode 100644 index 0000000000..8f2d0029f6 --- /dev/null +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreConfigurationTest.java @@ -0,0 +1,36 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store; + +public class JsonFileConfigStoreConfigurationTest extends AbstractDurableConfigurationStoreTestCase +{ + @Override + protected DurableConfigurationStore createConfigStore() throws Exception + { + return new JsonFileConfigStore(); + } + + @Override + public void testBindQueue() throws Exception + { + // TODO: Temporarily disable the test as it is already fixed on trunk + } +} diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreTest.java index 96609ae992..5d2998de86 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreTest.java @@ -28,9 +28,10 @@ import java.util.UUID; import org.apache.qpid.server.model.Binding; import org.apache.qpid.server.model.Queue; -import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.util.ServerScopedRuntimeException; import org.apache.qpid.test.utils.QpidTestCase; +import org.apache.qpid.test.utils.TestFileUtils; +import org.apache.qpid.util.FileUtils; import org.mockito.InOrder; import static org.mockito.Matchers.any; @@ -46,36 +47,34 @@ import static org.mockito.Mockito.when; public class JsonFileConfigStoreTest extends QpidTestCase { private final ConfigurationRecoveryHandler _recoveryHandler = mock(ConfigurationRecoveryHandler.class); - private VirtualHost _virtualHost; + private JsonFileConfigStore _store; private HashMap _configurationStoreSettings; + private String _virtualHostName; + private File _storeLocation; @Override public void setUp() throws Exception { super.setUp(); - removeStoreFile(); - _virtualHost = mock(VirtualHost.class); - when(_virtualHost.getName()).thenReturn(getName()); + _virtualHostName = getName(); + _storeLocation = TestFileUtils.createTestDirectory("json", true); _configurationStoreSettings = new HashMap(); _configurationStoreSettings.put(JsonFileConfigStore.STORE_TYPE, JsonFileConfigStore.TYPE); - _configurationStoreSettings.put(JsonFileConfigStore.STORE_PATH, TMP_FOLDER); - when(_virtualHost.getConfigurationStoreSettings()).thenReturn(_configurationStoreSettings); + _configurationStoreSettings.put(JsonFileConfigStore.STORE_PATH, _storeLocation.getAbsolutePath()); _store = new JsonFileConfigStore(); } @Override public void tearDown() throws Exception { - removeStoreFile(); - } - - private void removeStoreFile() - { - File file = new File(TMP_FOLDER, getName() + ".json"); - if(file.exists()) + try + { + super.tearDown(); + } + finally { - file.delete(); + FileUtils.delete(_storeLocation, true); } } @@ -84,7 +83,7 @@ public class JsonFileConfigStoreTest extends QpidTestCase _configurationStoreSettings.put(JsonFileConfigStore.STORE_PATH, null); try { - _store.configureConfigStore(_virtualHost, _recoveryHandler); + _store.openConfigurationStore(_virtualHostName, _configurationStoreSettings); fail("Store should not successfully configure if there is no path set"); } catch (ServerScopedRuntimeException e) @@ -99,7 +98,7 @@ public class JsonFileConfigStoreTest extends QpidTestCase _configurationStoreSettings.put(JsonFileConfigStore.STORE_PATH, System.getProperty("file.separator")); try { - _store.configureConfigStore(_virtualHost, _recoveryHandler); + _store.openConfigurationStore(_virtualHostName, _configurationStoreSettings); fail("Store should not successfully configure if there is an invalid path set"); } catch (ServerScopedRuntimeException e) @@ -110,12 +109,13 @@ public class JsonFileConfigStoreTest extends QpidTestCase public void testStartFromNoStore() throws Exception { - _store.configureConfigStore(_virtualHost, _recoveryHandler); + _store.openConfigurationStore(_virtualHostName, _configurationStoreSettings); + _store.recoverConfigurationStore(_recoveryHandler); InOrder inorder = inOrder(_recoveryHandler); inorder.verify(_recoveryHandler).beginConfigurationRecovery(eq(_store), eq(0)); inorder.verify(_recoveryHandler,never()).configuredObject(any(UUID.class),anyString(),anyMap()); inorder.verify(_recoveryHandler).completeConfigurationRecovery(); - _store.close(); + _store.closeConfigurationStore(); } public void testUpdatedConfigVersionIsRetained() throws Exception @@ -123,10 +123,12 @@ public class JsonFileConfigStoreTest extends QpidTestCase final int NEW_CONFIG_VERSION = 42; when(_recoveryHandler.completeConfigurationRecovery()).thenReturn(NEW_CONFIG_VERSION); - _store.configureConfigStore(_virtualHost, _recoveryHandler); - _store.close(); + _store.openConfigurationStore(_virtualHostName, _configurationStoreSettings); + _store.recoverConfigurationStore(_recoveryHandler); + _store.closeConfigurationStore(); - _store.configureConfigStore(_virtualHost, _recoveryHandler); + _store.openConfigurationStore(_virtualHostName, _configurationStoreSettings); + _store.recoverConfigurationStore(_recoveryHandler); InOrder inorder = inOrder(_recoveryHandler); // first time the config version should be the initial version - 0 @@ -135,27 +137,28 @@ public class JsonFileConfigStoreTest extends QpidTestCase // second time the config version should be the updated version inorder.verify(_recoveryHandler).beginConfigurationRecovery(eq(_store), eq(NEW_CONFIG_VERSION)); - _store.close(); + _store.closeConfigurationStore(); } public void testCreateObject() throws Exception { - _store.configureConfigStore(_virtualHost, _recoveryHandler); + _store.openConfigurationStore(_virtualHostName, _configurationStoreSettings); final UUID queueId = new UUID(0, 1); final String queueType = Queue.class.getSimpleName(); final Map queueAttr = Collections.singletonMap("name", (Object) "q1"); _store.create(queueId, queueType, queueAttr); - _store.close(); + _store.closeConfigurationStore(); - _store.configureConfigStore(_virtualHost, _recoveryHandler); + _store.openConfigurationStore(_virtualHostName, _configurationStoreSettings); + _store.recoverConfigurationStore(_recoveryHandler); verify(_recoveryHandler).configuredObject(eq(queueId), eq(queueType), eq(queueAttr)); - _store.close(); + _store.closeConfigurationStore(); } public void testCreateAndUpdateObject() throws Exception { - _store.configureConfigStore(_virtualHost, _recoveryHandler); + _store.openConfigurationStore(_virtualHostName, _configurationStoreSettings); final UUID queueId = new UUID(0, 1); final String queueType = Queue.class.getSimpleName(); Map queueAttr = Collections.singletonMap("name", (Object) "q1"); @@ -167,17 +170,18 @@ public class JsonFileConfigStoreTest extends QpidTestCase queueAttr.put("owner", "theowner"); _store.update(queueId, queueType, queueAttr); - _store.close(); + _store.closeConfigurationStore(); - _store.configureConfigStore(_virtualHost, _recoveryHandler); + _store.openConfigurationStore(_virtualHostName, _configurationStoreSettings); + _store.recoverConfigurationStore(_recoveryHandler); verify(_recoveryHandler).configuredObject(eq(queueId), eq(queueType), eq(queueAttr)); - _store.close(); + _store.closeConfigurationStore(); } public void testCreateAndRemoveObject() throws Exception { - _store.configureConfigStore(_virtualHost, _recoveryHandler); + _store.openConfigurationStore(_virtualHostName, _configurationStoreSettings); final UUID queueId = new UUID(0, 1); final String queueType = Queue.class.getSimpleName(); Map queueAttr = Collections.singletonMap("name", (Object) "q1"); @@ -187,16 +191,17 @@ public class JsonFileConfigStoreTest extends QpidTestCase _store.remove(queueId, queueType); - _store.close(); + _store.closeConfigurationStore(); - _store.configureConfigStore(_virtualHost, _recoveryHandler); + _store.openConfigurationStore(_virtualHostName, _configurationStoreSettings); + _store.recoverConfigurationStore(_recoveryHandler); verify(_recoveryHandler, never()).configuredObject(any(UUID.class), anyString(), anyMap()); - _store.close(); + _store.closeConfigurationStore(); } public void testCreateUnknownObjectType() throws Exception { - _store.configureConfigStore(_virtualHost, _recoveryHandler); + _store.openConfigurationStore(_virtualHostName, _configurationStoreSettings); try { _store.create(UUID.randomUUID(), "wibble", Collections.emptyMap()); @@ -210,7 +215,7 @@ public class JsonFileConfigStoreTest extends QpidTestCase public void testTwoObjectsWithSameId() throws Exception { - _store.configureConfigStore(_virtualHost, _recoveryHandler); + _store.openConfigurationStore(_virtualHostName, _configurationStoreSettings); final UUID id = UUID.randomUUID(); _store.create(id, "Queue", Collections.emptyMap()); try @@ -227,11 +232,11 @@ public class JsonFileConfigStoreTest extends QpidTestCase public void testChangeTypeOfObject() throws Exception { - _store.configureConfigStore(_virtualHost, _recoveryHandler); + _store.openConfigurationStore(_virtualHostName, _configurationStoreSettings); final UUID id = UUID.randomUUID(); _store.create(id, "Queue", Collections.emptyMap()); - _store.close(); - _store.configureConfigStore(_virtualHost, _recoveryHandler); + _store.closeConfigurationStore(); + _store.openConfigurationStore(_virtualHostName, _configurationStoreSettings); try { @@ -246,21 +251,21 @@ public class JsonFileConfigStoreTest extends QpidTestCase public void testLockFileGuaranteesExclusiveAccess() throws Exception { - _store.configureConfigStore(_virtualHost, _recoveryHandler); + _store.openConfigurationStore(_virtualHostName, _configurationStoreSettings); JsonFileConfigStore secondStore = new JsonFileConfigStore(); try { - secondStore.configureConfigStore(_virtualHost, _recoveryHandler); + secondStore.openConfigurationStore(_virtualHostName, _configurationStoreSettings); fail("Should not be able to open a second store with the same path"); } catch(ServerScopedRuntimeException e) { // pass } - _store.close(); - secondStore.configureConfigStore(_virtualHost, _recoveryHandler); + _store.closeConfigurationStore(); + secondStore.openConfigurationStore(_virtualHostName, _configurationStoreSettings); } @@ -268,7 +273,7 @@ public class JsonFileConfigStoreTest extends QpidTestCase public void testCreatedNestedObjects() throws Exception { - _store.configureConfigStore(_virtualHost, _recoveryHandler); + _store.openConfigurationStore(_virtualHostName, _configurationStoreSettings); final UUID queueId = new UUID(0, 1); final UUID queue2Id = new UUID(1, 1); @@ -290,14 +295,15 @@ public class JsonFileConfigStoreTest extends QpidTestCase _store.update(true, new ConfiguredObjectRecord(bindingId, "Binding", bindingAttributes), new ConfiguredObjectRecord(binding2Id, "Binding", binding2Attributes)); - _store.close(); - _store.configureConfigStore(_virtualHost, _recoveryHandler); + _store.closeConfigurationStore(); + _store.openConfigurationStore(_virtualHostName, _configurationStoreSettings); + _store.recoverConfigurationStore(_recoveryHandler); verify(_recoveryHandler).configuredObject(eq(queueId), eq("Queue"), eq(EMPTY_ATTR)); verify(_recoveryHandler).configuredObject(eq(queue2Id), eq("Queue"), eq(EMPTY_ATTR)); verify(_recoveryHandler).configuredObject(eq(exchangeId), eq("Exchange"), eq(EMPTY_ATTR)); verify(_recoveryHandler).configuredObject(eq(bindingId),eq("Binding"), eq(bindingAttributes)); verify(_recoveryHandler).configuredObject(eq(binding2Id),eq("Binding"), eq(binding2Attributes)); - _store.close(); + _store.closeConfigurationStore(); } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java index 908f3fe6e1..e46a8939f4 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java @@ -27,11 +27,11 @@ import java.io.File; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.UUID; import org.apache.log4j.Logger; import org.apache.qpid.server.message.EnqueueableMessage; -import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler; import org.apache.qpid.test.utils.QpidTestCase; import org.apache.qpid.util.FileUtils; @@ -49,7 +49,7 @@ public abstract class MessageStoreQuotaEventsTestBase extends QpidTestCase imple private UUID _transactionResource; protected abstract MessageStore createStore() throws Exception; - protected abstract VirtualHost createVirtualHost(String storeLocation); + protected abstract Map createStoreSettings(String storeLocation); protected abstract int getNumberOfMessagesToFillStore(); @Override @@ -61,14 +61,14 @@ public abstract class MessageStoreQuotaEventsTestBase extends QpidTestCase imple FileUtils.delete(_storeLocation, true); - VirtualHost vhost = createVirtualHost(_storeLocation.getAbsolutePath()); + Map storeSettings = createStoreSettings(_storeLocation.getAbsolutePath()); _store = createStore(); - ((DurableConfigurationStore)_store).configureConfigStore(vhost, null); + ((DurableConfigurationStore)_store).openConfigurationStore("test", storeSettings); MessageStoreRecoveryHandler recoveryHandler = mock(MessageStoreRecoveryHandler.class); when(recoveryHandler.begin()).thenReturn(mock(StoredMessageRecoveryHandler.class)); - _store.configureMessageStore(vhost, recoveryHandler, null); - _store.activate(); + _store.openMessageStore("test", storeSettings); + _store.recoverMessageStore(recoveryHandler, null); _transactionResource = UUID.randomUUID(); _events = new ArrayList(); @@ -87,7 +87,7 @@ public abstract class MessageStoreQuotaEventsTestBase extends QpidTestCase imple { if (_store != null) { - _store.close(); + _store.closeMessageStore(); } FileUtils.delete(_storeLocation, true); } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java index 7ebfd54df6..45f7a2a39e 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java @@ -23,50 +23,52 @@ package org.apache.qpid.server.store; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.times; -import org.apache.qpid.server.model.VirtualHost; +import java.util.Map; +import java.util.UUID; + +import org.apache.qpid.server.message.EnqueueableMessage; +import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler; +import org.apache.qpid.server.store.Transaction.Record; import org.apache.qpid.test.utils.QpidTestCase; public abstract class MessageStoreTestCase extends QpidTestCase { - private ConfigurationRecoveryHandler _recoveryHandler; private MessageStoreRecoveryHandler _messageStoreRecoveryHandler; private StoredMessageRecoveryHandler _storedMessageRecoveryHandler; private TransactionLogRecoveryHandler _logRecoveryHandler; private TransactionLogRecoveryHandler.QueueEntryRecoveryHandler _queueEntryRecoveryHandler; private TransactionLogRecoveryHandler.DtxRecordRecoveryHandler _dtxRecordRecoveryHandler; - private VirtualHost _virtualHost; private MessageStore _store; + private Map _storeSettings; public void setUp() throws Exception { super.setUp(); - _recoveryHandler = mock(ConfigurationRecoveryHandler.class); _storedMessageRecoveryHandler = mock(StoredMessageRecoveryHandler.class); _logRecoveryHandler = mock(TransactionLogRecoveryHandler.class); _messageStoreRecoveryHandler = mock(MessageStoreRecoveryHandler.class); _queueEntryRecoveryHandler = mock(TransactionLogRecoveryHandler.QueueEntryRecoveryHandler.class); _dtxRecordRecoveryHandler = mock(TransactionLogRecoveryHandler.DtxRecordRecoveryHandler.class); - _virtualHost = mock(VirtualHost.class); - when(_messageStoreRecoveryHandler.begin()).thenReturn(_storedMessageRecoveryHandler); when(_logRecoveryHandler.begin(any(MessageStore.class))).thenReturn(_queueEntryRecoveryHandler); when(_queueEntryRecoveryHandler.completeQueueEntryRecovery()).thenReturn(_dtxRecordRecoveryHandler); - setUpStoreConfiguration(_virtualHost); - when(_virtualHost.getName()).thenReturn(getTestName()); + _storeSettings = getStoreSettings(); _store = createMessageStore(); - ((DurableConfigurationStore)_store).configureConfigStore(_virtualHost, _recoveryHandler); - _store.configureMessageStore(_virtualHost, _messageStoreRecoveryHandler, _logRecoveryHandler); + _store.openMessageStore("test", _storeSettings); + _store.recoverMessageStore(_messageStoreRecoveryHandler, _logRecoveryHandler); } - protected abstract void setUpStoreConfiguration(VirtualHost virtualHost) throws Exception; + protected abstract Map getStoreSettings() throws Exception; protected abstract MessageStore createMessageStore(); @@ -75,4 +77,116 @@ public abstract class MessageStoreTestCase extends QpidTestCase return _store; } + public void testRecordXid() throws Exception + { + Record enqueueRecord = getTestRecord(1); + Record dequeueRecord = getTestRecord(2); + Record[] enqueues = { enqueueRecord }; + Record[] dequeues = { dequeueRecord }; + byte[] globalId = new byte[] { 1 }; + byte[] branchId = new byte[] { 2 }; + + Transaction transaction = _store.newTransaction(); + transaction.recordXid(1l, globalId, branchId, enqueues, dequeues); + transaction.commitTran(); + reopenStore(); + verify(_dtxRecordRecoveryHandler).dtxRecord(1l, globalId, branchId, enqueues, dequeues); + + transaction = _store.newTransaction(); + transaction.removeXid(1l, globalId, branchId); + transaction.commitTran(); + + reopenStore(); + verify(_dtxRecordRecoveryHandler, times(1)).dtxRecord(1l, globalId, branchId, enqueues, dequeues); + } + + private void reopenStore() throws Exception + { + _store.closeMessageStore(); + + _store = createMessageStore(); + _store.openMessageStore("test", _storeSettings); + _store.recoverMessageStore(_messageStoreRecoveryHandler, _logRecoveryHandler); + } + private Record getTestRecord(long messageNumber) + { + UUID queueId1 = UUIDGenerator.generateRandomUUID(); + TransactionLogResource queue1 = mock(TransactionLogResource.class); + when(queue1.getId()).thenReturn(queueId1); + EnqueueableMessage message1 = mock(EnqueueableMessage.class); + when(message1.isPersistent()).thenReturn(true); + when(message1.getMessageNumber()).thenReturn(messageNumber); + final StoredMessage storedMessage = mock(StoredMessage.class); + when(storedMessage.getMessageNumber()).thenReturn(messageNumber); + when(message1.getStoredMessage()).thenReturn(storedMessage); + Record enqueueRecord = new TestRecord(queue1, message1); + return enqueueRecord; + } + + private static class TestRecord implements Record + { + private TransactionLogResource _queue; + private EnqueueableMessage _message; + + public TestRecord(TransactionLogResource queue, EnqueueableMessage message) + { + super(); + _queue = queue; + _message = message; + } + + @Override + public TransactionLogResource getResource() + { + return _queue; + } + + @Override + public EnqueueableMessage getMessage() + { + return _message; + } + + @Override + public int hashCode() + { + final int prime = 31; + int result = 1; + result = prime * result + ((_message == null) ? 0 : new Long(_message.getMessageNumber()).hashCode()); + result = prime * result + ((_queue == null) ? 0 : _queue.getId().hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) + { + if (this == obj) + { + return true; + } + if (obj == null) + { + return false; + } + if (!(obj instanceof Record)) + { + return false; + } + Record other = (Record) obj; + if (_message == null && other.getMessage() != null) + { + return false; + } + if (_queue == null && other.getResource() != null) + { + return false; + } + if (_message.getMessageNumber() != other.getMessage().getMessageNumber()) + { + return false; + } + return _queue.getId().equals(other.getResource().getId()); + } + + } } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java index 9f1f6f48c0..e3d56ab4bb 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java @@ -32,13 +32,16 @@ import org.apache.qpid.server.exchange.ExchangeImpl; import org.apache.qpid.server.logging.EventLogger; import org.apache.qpid.server.store.StoreException; import org.apache.qpid.server.configuration.IllegalConfigurationException; +import org.apache.qpid.server.exchange.AMQUnknownExchangeType; import org.apache.qpid.server.exchange.DirectExchange; import org.apache.qpid.server.exchange.ExchangeFactory; import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.exchange.FanoutExchange; import org.apache.qpid.server.exchange.HeadersExchange; import org.apache.qpid.server.exchange.TopicExchange; import org.apache.qpid.server.model.Binding; import org.apache.qpid.server.model.Queue; +import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.plugin.ExchangeType; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueFactory; @@ -59,19 +62,21 @@ import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; - import static org.apache.qpid.server.model.VirtualHost.CURRENT_CONFIG_VERSION; public class DurableConfigurationRecovererTest extends QpidTestCase { + private static final String VIRTUAL_HOST_NAME = "test"; private static final UUID QUEUE_ID = new UUID(0,0); - private static final UUID TOPIC_EXCHANGE_ID = new UUID(0,1); - private static final UUID DIRECT_EXCHANGE_ID = new UUID(0,2); + private static final UUID TOPIC_EXCHANGE_ID = UUIDGenerator.generateExchangeUUID(TopicExchange.TYPE.getDefaultExchangeName(), VIRTUAL_HOST_NAME); + private static final UUID DIRECT_EXCHANGE_ID = UUIDGenerator.generateExchangeUUID(DirectExchange.TYPE.getDefaultExchangeName(), VIRTUAL_HOST_NAME); private static final String CUSTOM_EXCHANGE_NAME = "customExchange"; private DurableConfigurationRecoverer _durableConfigurationRecoverer; - private ExchangeImpl _directExchange; - private ExchangeImpl _topicExchange; + private ExchangeImpl _directExchange; + private ExchangeImpl _topicExchange; + private ExchangeImpl _matchExchange; + private ExchangeImpl _fanoutExchange; private VirtualHost _vhost; private DurableConfigurationStore _store; private ExchangeFactory _exchangeFactory; @@ -83,21 +88,19 @@ public class DurableConfigurationRecovererTest extends QpidTestCase { super.setUp(); + _exchangeFactory = mock(ExchangeFactory.class); - _directExchange = mock(ExchangeImpl.class); - when(_directExchange.getExchangeType()).thenReturn(DirectExchange.TYPE); - - - _topicExchange = mock(ExchangeImpl.class); - when(_topicExchange.getExchangeType()).thenReturn(TopicExchange.TYPE); + _directExchange = createAndRegisterDefaultExchangeWithFactory(DirectExchange.TYPE); + _topicExchange = createAndRegisterDefaultExchangeWithFactory(TopicExchange.TYPE); + _matchExchange = createAndRegisterDefaultExchangeWithFactory(HeadersExchange.TYPE); + _fanoutExchange = createAndRegisterDefaultExchangeWithFactory(FanoutExchange.TYPE); - AMQQueue queue = mock(AMQQueue.class); + AMQQueue queue = mock(AMQQueue.class); _vhost = mock(VirtualHost.class); + when(_vhost.getName()).thenReturn(VIRTUAL_HOST_NAME); _exchangeRegistry = mock(ExchangeRegistry.class); - when(_exchangeRegistry.getExchange(eq(DIRECT_EXCHANGE_ID))).thenReturn(_directExchange); - when(_exchangeRegistry.getExchange(eq(TOPIC_EXCHANGE_ID))).thenReturn(_topicExchange); when(_vhost.getQueue(eq(QUEUE_ID))).thenReturn(queue); @@ -165,7 +168,6 @@ public class DurableConfigurationRecovererTest extends QpidTestCase } }); - _exchangeFactory = mock(ExchangeFactory.class); DurableConfiguredObjectRecoverer[] recoverers = { @@ -187,6 +189,19 @@ public class DurableConfigurationRecovererTest extends QpidTestCase } + private ExchangeImpl createAndRegisterDefaultExchangeWithFactory(ExchangeType exchangeType) throws AMQUnknownExchangeType, UnknownExchangeException + { + ExchangeImpl exchange = mock(ExchangeImpl.class); + when(exchange.getExchangeType()).thenReturn(exchangeType); + Map directExchangeAttrsWithId = new HashMap(); + directExchangeAttrsWithId.put(org.apache.qpid.server.model.Exchange.ID, UUIDGenerator.generateExchangeUUID(exchangeType.getDefaultExchangeName(), VIRTUAL_HOST_NAME)); + directExchangeAttrsWithId.put(org.apache.qpid.server.model.Exchange.DURABLE, true); + directExchangeAttrsWithId.put(org.apache.qpid.server.model.Exchange.TYPE, exchangeType.getType()); + directExchangeAttrsWithId.put(org.apache.qpid.server.model.Exchange.NAME, exchangeType.getDefaultExchangeName()); + when(_exchangeFactory.restoreExchange(directExchangeAttrsWithId)).thenReturn(exchange); + return exchange; + } + public void testUpgradeEmptyStore() throws Exception { _durableConfigurationRecoverer.beginConfigurationRecovery(_store, 0); @@ -275,12 +290,29 @@ public class DurableConfigurationRecovererTest extends QpidTestCase public ExchangeImpl answer(final InvocationOnMock invocation) throws Throwable { Map arguments = attributesCaptor.getValue(); - if(CUSTOM_EXCHANGE_NAME.equals(arguments.get(org.apache.qpid.server.model.Exchange.NAME)) + String exchangeName = (String) arguments.get(org.apache.qpid.server.model.Exchange.NAME); + if(CUSTOM_EXCHANGE_NAME.equals(exchangeName) && HeadersExchange.TYPE.getType().equals(arguments.get(org.apache.qpid.server.model.Exchange.TYPE)) - && customExchangeId.equals(arguments.get(org.apache.qpid.server.model.Exchange.ID))) + && customExchangeId.equals((UUID) arguments.get(org.apache.qpid.server.model.Exchange.ID))) { return customExchange; } + else if ("amq.topic".equals(exchangeName)) + { + return _topicExchange; + } + else if ("amq.direct".equals(exchangeName)) + { + return _directExchange; + } + else if ("amq.fanout".equals(exchangeName)) + { + return _fanoutExchange; + } + else if ("amq.match".equals(exchangeName)) + { + return _matchExchange; + } else { return null; @@ -410,12 +442,29 @@ public class DurableConfigurationRecovererTest extends QpidTestCase public ExchangeImpl answer(final InvocationOnMock invocation) throws Throwable { Map arguments = attributesCaptor.getValue(); - if(CUSTOM_EXCHANGE_NAME.equals(arguments.get(org.apache.qpid.server.model.Exchange.NAME)) + String exchangeName = (String) arguments.get(org.apache.qpid.server.model.Exchange.NAME); + if(CUSTOM_EXCHANGE_NAME.equals(exchangeName) && HeadersExchange.TYPE.getType().equals(arguments.get(org.apache.qpid.server.model.Exchange.TYPE)) && exchangeId.equals(arguments.get(org.apache.qpid.server.model.Exchange.ID))) { return customExchange; } + else if ("amq.topic".equals(exchangeName)) + { + return _topicExchange; + } + else if ("amq.direct".equals(exchangeName)) + { + return _directExchange; + } + else if ("amq.fanout".equals(exchangeName)) + { + return _fanoutExchange; + } + else if ("amq.match".equals(exchangeName)) + { + return _matchExchange; + } else { return null; diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/StandardVirtualHostTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/StandardVirtualHostTest.java index e31cb16d27..5872383702 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/StandardVirtualHostTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/StandardVirtualHostTest.java @@ -29,6 +29,7 @@ import java.util.HashMap; import java.util.Map; import java.util.UUID; +import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.server.binding.BindingImpl; import org.apache.qpid.server.exchange.AbstractExchange; import org.apache.qpid.server.exchange.ExchangeImpl; @@ -101,27 +102,6 @@ public class StandardVirtualHostTest extends QpidTestCase customBindingTestImpl(new String[0]); } - /** - * Tests that specifying custom routing keys for a queue in the configuration file results in failure - * to create the vhost (since this is illegal, only queue names are used with the default exchange) - */ - public void testSpecifyingCustomBindingForDefaultExchangeThrowsException() throws Exception - { - final String queueName = getName(); - final String customBinding = "custom-binding"; - writeConfigFile(queueName, queueName, null, false, new String[]{customBinding}); - - try - { - createVirtualHost(queueName); - fail("virtualhost creation should have failed due to illegal configuration"); - } - catch (IllegalConfigurationException e) - { - // pass - } - } - public void testVirtualHostBecomesActive() throws Exception { writeConfigFile(getName(), getName(), getName() +".direct", false, new String[0]); @@ -145,6 +125,7 @@ public class StandardVirtualHostTest extends QpidTestCase assertNotNull(vhost); assertEquals(State.ACTIVE, vhost.getState()); vhost.close(); + _virtualHostRegistry.unregisterVirtualHost(vhost); assertEquals(State.STOPPED, vhost.getState()); assertEquals(0, vhost.getHouseKeepingActiveCount()); } @@ -156,6 +137,7 @@ public class StandardVirtualHostTest extends QpidTestCase assertNotNull(host); assertEquals(State.ACTIVE, host.getState()); host.close(); + _virtualHostRegistry.unregisterVirtualHost(host); assertEquals(State.STOPPED, host.getState()); assertEquals(0, host.getHouseKeepingActiveCount()); } @@ -296,16 +278,11 @@ public class StandardVirtualHostTest extends QpidTestCase objectMapper.writeValue(new File(_storeFolder, vhostName + ".json"), data); JsonFileConfigStore store = new JsonFileConfigStore(); - org.apache.qpid.server.model.VirtualHost virtualHost = mock(org.apache.qpid.server.model.VirtualHost.class); - when(virtualHost.getName()).thenReturn(vhostName); Map configurationStoreSettings = new HashMap(); - when(virtualHost.getConfigurationStoreSettings()).thenReturn(configurationStoreSettings); configurationStoreSettings.put(DurableConfigurationStore.STORE_TYPE, JsonFileConfigStore.TYPE); configurationStoreSettings.put(DurableConfigurationStore.STORE_PATH, _storeFolder.getAbsolutePath()); - ConfigurationRecoveryHandler recoveryHandler = mock(ConfigurationRecoveryHandler.class); - when(recoveryHandler.completeConfigurationRecovery()).thenReturn(org.apache.qpid.server.model.VirtualHost.CURRENT_CONFIG_VERSION); - store.configureConfigStore(virtualHost , recoveryHandler ); + store.openConfigurationStore(vhostName, configurationStoreSettings); UUID exchangeId = UUIDGenerator.generateExchangeUUID(exchangeName == null? "amq.direct" : exchangeName, vhostName); if(exchangeName != null && !dontDeclare) @@ -355,7 +332,7 @@ public class StandardVirtualHostTest extends QpidTestCase } store.create(UUID.randomUUID(), org.apache.qpid.server.model.Binding.class.getSimpleName(), attributes); } - store.close(); + store.closeConfigurationStore(); } } diff --git a/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java b/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java index f7b65075f7..2c2938cb01 100644 --- a/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java +++ b/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java @@ -35,7 +35,6 @@ import java.util.List; import java.util.Map; import org.apache.log4j.Logger; -import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.store.AbstractJDBCMessageStore; import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.Event; @@ -125,28 +124,13 @@ public class DerbyMessageStore extends AbstractJDBCMessageStore implements Messa } @Override - protected void implementationSpecificConfiguration(String name, - VirtualHost virtualHost) + protected void implementationSpecificConfiguration(String name, Map messageStoreSettings) throws ClassNotFoundException { //Update to pick up QPID_WORK and use that as the default location not just derbyDB - - Map messageStoreSettings = virtualHost.getMessageStoreSettings(); - Map configurationStoreSettings = virtualHost.getConfigurationStoreSettings(); _driverClass = (Class) Class.forName(SQL_DRIVER_NAME); - String databasePath = null; - if (isConfigStoreOnly()) - { - databasePath = (String) configurationStoreSettings.get(DurableConfigurationStore.STORE_PATH); - } - else - { - if (messageStoreSettings != null) - { - databasePath = (String) messageStoreSettings.get(MessageStore.STORE_PATH); - } - } + String databasePath = (String) messageStoreSettings.get(MessageStore.STORE_PATH);; if(databasePath == null) { diff --git a/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreConfigurationTest.java b/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreConfigurationTest.java index 4a1a3251e3..aaf65e9ee0 100644 --- a/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreConfigurationTest.java +++ b/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreConfigurationTest.java @@ -25,48 +25,10 @@ import org.apache.qpid.server.store.AbstractDurableConfigurationStoreTestCase; public class DerbyMessageStoreConfigurationTest extends AbstractDurableConfigurationStoreTestCase { - private DerbyMessageStore _derbyMessageStore; - - @Override - protected DerbyMessageStore createMessageStore() throws Exception - { - createStoreIfNecessary(); - return _derbyMessageStore; - } - - @Override - protected void closeMessageStore() throws Exception - { - closeStoreIfNecessary(); - } - - private void createStoreIfNecessary() - { - if(_derbyMessageStore == null) - { - _derbyMessageStore = new DerbyMessageStore(); - } - } - @Override protected DerbyMessageStore createConfigStore() throws Exception { - createStoreIfNecessary(); - return _derbyMessageStore; + return new DerbyMessageStore(); } - @Override - protected void closeConfigStore() throws Exception - { - closeStoreIfNecessary(); - } - - private void closeStoreIfNecessary() throws Exception - { - if (_derbyMessageStore != null) - { - _derbyMessageStore.close(); - _derbyMessageStore = null; - } - } } diff --git a/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreQuotaEventsTest.java b/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreQuotaEventsTest.java index f23b5a3e23..ba7ae26292 100644 --- a/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreQuotaEventsTest.java +++ b/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreQuotaEventsTest.java @@ -20,14 +20,10 @@ */ package org.apache.qpid.server.store.derby; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - import java.util.HashMap; import java.util.Map; import org.apache.log4j.Logger; -import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.MessageStoreQuotaEventsTestBase; @@ -58,20 +54,15 @@ public class DerbyMessageStoreQuotaEventsTest extends MessageStoreQuotaEventsTes } @Override - protected VirtualHost createVirtualHost(String storeLocation) + protected Map createStoreSettings(String storeLocation) { _logger.debug("Applying store specific config. overfull-size=" + OVERFULL_SIZE + ", underfull-size=" + UNDERFULL_SIZE); - VirtualHost vhost = mock(VirtualHost.class); Map messageStoreSettings = new HashMap(); messageStoreSettings.put(MessageStore.STORE_PATH, storeLocation); messageStoreSettings.put(MessageStore.OVERFULL_SIZE, OVERFULL_SIZE); messageStoreSettings.put(MessageStore.UNDERFULL_SIZE, UNDERFULL_SIZE); - - when(vhost.getMessageStoreSettings()).thenReturn(messageStoreSettings ); - when(vhost.getName()).thenReturn("test"); - - return vhost; + return messageStoreSettings; } } diff --git a/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreTest.java b/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreTest.java index e3f91cc8fb..20de4ea339 100644 --- a/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreTest.java +++ b/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreTest.java @@ -20,13 +20,11 @@ */ package org.apache.qpid.server.store.derby; -import static org.mockito.Mockito.when; import java.io.File; import java.util.HashMap; import java.util.Map; -import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.MessageStoreTestCase; import org.apache.qpid.util.FileUtils; @@ -53,7 +51,7 @@ public class DerbyMessageStoreTest extends MessageStoreTestCase File location = new File(_storeLocation); assertTrue("Store does not exist at " + _storeLocation, location.exists()); - getStore().close(); + getStore().closeMessageStore(); assertTrue("Store does not exist at " + _storeLocation, location.exists()); getStore().onDelete(); @@ -61,13 +59,13 @@ public class DerbyMessageStoreTest extends MessageStoreTestCase } @Override - protected void setUpStoreConfiguration(VirtualHost virtualHost) throws Exception + protected Map getStoreSettings() throws Exception { _storeLocation = TMP_FOLDER + File.separator + getTestName(); + deleteStoreIfExists(); Map messageStoreSettings = new HashMap(); messageStoreSettings.put(MessageStore.STORE_PATH, _storeLocation); - when(virtualHost.getMessageStoreSettings()).thenReturn(messageStoreSettings); - deleteStoreIfExists(); + return messageStoreSettings; } private void deleteStoreIfExists() diff --git a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java index 2c2b701c61..55c8d3ef79 100644 --- a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java +++ b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java @@ -31,7 +31,6 @@ import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; import org.apache.log4j.Logger; -import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.plugin.JDBCConnectionProviderFactory; import org.apache.qpid.server.store.AbstractJDBCMessageStore; import org.apache.qpid.server.store.MessageStore; @@ -259,18 +258,24 @@ public class JDBCMessageStore extends AbstractJDBCMessageStore implements Messag @Override protected void doClose() { - while(!_transactions.isEmpty()) - { - RecordedJDBCTransaction txn = _transactions.get(0); - txn.abortTran(); - } try { - _connectionProvider.close(); + while(!_transactions.isEmpty()) + { + RecordedJDBCTransaction txn = _transactions.get(0); + txn.abortTran(); + } } - catch (SQLException e) + finally { - throw new StoreException("Unable to close connection provider ", e); + try + { + _connectionProvider.close(); + } + catch (SQLException e) + { + throw new StoreException("Unable to close connection provider ", e); + } } } @@ -281,11 +286,9 @@ public class JDBCMessageStore extends AbstractJDBCMessageStore implements Messag } - protected void implementationSpecificConfiguration(String name, - VirtualHost virtualHost) + protected void implementationSpecificConfiguration(String name, Map storeSettings) throws ClassNotFoundException, SQLException { - Map storeSettings = isConfigStoreOnly() ? virtualHost.getConfigurationStoreSettings() : virtualHost.getMessageStoreSettings(); String connectionURL = String.valueOf(storeSettings.get(CONNECTION_URL)); Object poolAttribute = storeSettings.get(CONNECTION_POOL); diff --git a/qpid/java/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java b/qpid/java/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java index dc6bb0158d..2322fa7102 100644 --- a/qpid/java/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java +++ b/qpid/java/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java @@ -20,8 +20,6 @@ */ package org.apache.qpid.server.store.jdbc; -import static org.mockito.Mockito.when; - import java.sql.Connection; import java.sql.DatabaseMetaData; import java.sql.DriverManager; @@ -32,7 +30,6 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; -import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.MessageStoreTestCase; @@ -55,21 +52,21 @@ public class JDBCMessageStoreTest extends MessageStoreTestCase public void testOnDelete() throws Exception { - String[] expectedTables = JDBCMessageStore.ALL_TABLES; + Set expectedTables = JDBCMessageStore.MESSAGE_STORE_TABLE_NAMES; assertTablesExist(expectedTables, true); - getStore().close(); + getStore().closeMessageStore(); assertTablesExist(expectedTables, true); getStore().onDelete(); assertTablesExist(expectedTables, false); } @Override - protected void setUpStoreConfiguration(VirtualHost virtualHost) throws Exception + protected Map getStoreSettings() { _connectionURL = "jdbc:derby:memory:/" + getTestName() + ";create=true"; Map messageStoreSettings = new HashMap(); messageStoreSettings.put(JDBCMessageStore.CONNECTION_URL, _connectionURL); - when(virtualHost.getMessageStoreSettings()).thenReturn(messageStoreSettings); + return messageStoreSettings; } @@ -79,7 +76,7 @@ public class JDBCMessageStoreTest extends MessageStoreTestCase return new JDBCMessageStore(); } - private void assertTablesExist(String[] expectedTables, boolean exists) throws SQLException + private void assertTablesExist(Set expectedTables, boolean exists) throws SQLException { Set existingTables = getTableNames(); for (String tableName : expectedTables) diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java index 7017ea6d45..06b7091a47 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java @@ -26,7 +26,6 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.qpid.server.message.EnqueueableMessage; import org.apache.qpid.server.message.MessageContentSource; -import org.apache.qpid.server.model.VirtualHost; public class QuotaMessageStore extends NullMessageStore @@ -49,9 +48,15 @@ public class } @Override - public void configureConfigStore(VirtualHost virtualHost, ConfigurationRecoveryHandler recoveryHandler) + public void openConfigurationStore(String virtualHostName, Map storeSettings) { - Map messageStoreSettings = virtualHost.getMessageStoreSettings(); + + } + + @Override + public void openMessageStore(String virtualHostName, Map messageStoreSettings) + { + _stateManager.attainState(State.INITIALISING); Object overfullAttr = messageStoreSettings.get(MessageStore.OVERFULL_SIZE); _persistentSizeHighThreshold = overfullAttr == null ? Long.MAX_VALUE @@ -72,18 +77,11 @@ public class { _persistentSizeLowThreshold = _persistentSizeHighThreshold; } - _stateManager.attainState(State.INITIALISING); - } - - @Override - public void configureMessageStore(VirtualHost virtualHost, MessageStoreRecoveryHandler recoveryHandler, - TransactionLogRecoveryHandler tlogRecoveryHandler) - { _stateManager.attainState(State.INITIALISED); } @Override - public void activate() + public void recoverMessageStore(MessageStoreRecoveryHandler messageRecoveryHandler, TransactionLogRecoveryHandler transactionLogRecoveryHandler) { _stateManager.attainState(State.ACTIVATING); _stateManager.attainState(State.ACTIVE); @@ -153,7 +151,7 @@ public class } @Override - public void close() + public void closeMessageStore() { if (_closed.compareAndSet(false, true)) { diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java index 84d1a9cb1c..44a5252355 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java @@ -29,7 +29,7 @@ import java.util.concurrent.ConcurrentHashMap; import org.apache.log4j.Logger; import org.apache.qpid.server.message.EnqueueableMessage; -import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.plugin.DurableConfigurationStoreFactory; import org.apache.qpid.server.plugin.MessageStoreFactory; public class SlowMessageStore implements MessageStore, DurableConfigurationStore @@ -47,51 +47,26 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore private HashMap _preDelays = new HashMap(); private HashMap _postDelays = new HashMap(); private long _defaultDelay = 0L; - private MessageStore _realStore = null; - private DurableConfigurationStore _durableConfigurationStore = null; + private MessageStore _realMessageStore = null; + private DurableConfigurationStore _realDurableConfigurationStore = null; private Map _eventListeners = new ConcurrentHashMap(); - // ***** MessageStore Interface. - @Override - public void configureConfigStore(VirtualHost virtualHost, ConfigurationRecoveryHandler recoveryHandler) + public void openConfigurationStore(String virtualHostName, Map storeSettings) { - _logger.info("Starting SlowMessageStore on Virtualhost:" + virtualHost.getName()); - - Map messageStoreSettings = virtualHost.getMessageStoreSettings(); - Object delaysAttr = messageStoreSettings.get(DELAYS); - - @SuppressWarnings({ "unchecked" }) - Map delays = (delaysAttr instanceof Map) ? (Map) delaysAttr : Collections.emptyMap(); - configureDelays(delays); - - final Object realStoreAttr = messageStoreSettings.get(REAL_STORE); - String messageStoreType = realStoreAttr == null ? MemoryMessageStore.TYPE : realStoreAttr.toString(); - - if (delays.containsKey(DEFAULT_DELAY)) - { - _defaultDelay = Long.parseLong(String.valueOf(delays.get(DEFAULT_DELAY))); - } - - _realStore = MessageStoreFactory.FACTORY_LOADER.get(messageStoreType).createMessageStore(); - - if (!_eventListeners.isEmpty()) - { - for (Iterator> it = _eventListeners.entrySet().iterator(); it.hasNext();) - { - Map.Entry entry = it.next(); - _realStore.addEventListener(entry.getKey(), entry.getValue()); - it.remove(); - } - } - - if (_realStore instanceof DurableConfigurationStore) + if (storeSettings != null && storeSettings.get(REAL_STORE) != null) { - _durableConfigurationStore = (DurableConfigurationStore)_realStore; - _durableConfigurationStore.configureConfigStore(virtualHost, recoveryHandler); + final String realStore = (String) storeSettings.get(REAL_STORE); + _realDurableConfigurationStore = new DurableConfigurationStoreCreator().createMessageStore(realStore); + _realDurableConfigurationStore.openConfigurationStore(virtualHostName, storeSettings); } + } + @Override + public void recoverConfigurationStore(ConfigurationRecoveryHandler recoveryHandler) + { + _realDurableConfigurationStore.recoverConfigurationStore(recoveryHandler); } private void configureDelays(Map delays) @@ -159,31 +134,67 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore } @Override - public void configureMessageStore(VirtualHost virtualHost, MessageStoreRecoveryHandler messageRecoveryHandler, - TransactionLogRecoveryHandler tlogRecoveryHandler) + public void openMessageStore(String virtualHostName, Map messageStoreSettings) { - _realStore.configureMessageStore(virtualHost, messageRecoveryHandler, tlogRecoveryHandler); + Object delaysAttr = messageStoreSettings.get(DELAYS); + + @SuppressWarnings({ "unchecked" }) + Map delays = (delaysAttr instanceof Map) ? (Map) delaysAttr : Collections.emptyMap(); + configureDelays(delays); + + if (delays.containsKey(DEFAULT_DELAY)) + { + _defaultDelay = Long.parseLong(String.valueOf(delays.get(DEFAULT_DELAY))); + } + + final Object realStoreAttr = messageStoreSettings.get(REAL_STORE) == null ? MemoryMessageStore.TYPE : messageStoreSettings.get(REAL_STORE); + final String realStore = (String) realStoreAttr; + _realMessageStore = MessageStoreFactory.FACTORY_LOADER.get(realStore).createMessageStore(); + + if (!_eventListeners.isEmpty()) + { + for (Iterator> it = _eventListeners.entrySet().iterator(); it.hasNext();) + { + Map.Entry entry = it.next(); + _realMessageStore.addEventListener(entry.getKey(), entry.getValue()); + it.remove(); + } + } + _realMessageStore.openMessageStore(virtualHostName, messageStoreSettings); + + if (_realDurableConfigurationStore == null) + { + _realDurableConfigurationStore = (DurableConfigurationStore) _realMessageStore; + } + } @Override - public void close() + public void closeMessageStore() { doPreDelay("close"); - _realStore.close(); + _realMessageStore.closeMessageStore(); doPostDelay("close"); } + @Override + public void closeConfigurationStore() + { + _realDurableConfigurationStore.closeConfigurationStore(); + } + + @Override public StoredMessage addMessage(M metaData) { - return _realStore.addMessage(metaData); + return _realMessageStore.addMessage(metaData); } @Override public void create(UUID id, String type, Map attributes) throws StoreException { doPreDelay("create"); - _durableConfigurationStore.create(id, type, attributes); + _realDurableConfigurationStore.create(id, type, attributes); doPostDelay("create"); } @@ -191,7 +202,7 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore public void remove(UUID id, String type) throws StoreException { doPreDelay("remove"); - _durableConfigurationStore.remove(id, type); + _realDurableConfigurationStore.remove(id, type); doPostDelay("remove"); } @@ -199,7 +210,7 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore public UUID[] removeConfiguredObjects(final UUID... objects) throws StoreException { doPreDelay("remove"); - UUID[] removed = _durableConfigurationStore.removeConfiguredObjects(objects); + UUID[] removed = _realDurableConfigurationStore.removeConfiguredObjects(objects); doPostDelay("remove"); return removed; } @@ -208,7 +219,7 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore public void update(UUID id, String type, Map attributes) throws StoreException { doPreDelay("update"); - _durableConfigurationStore.update(id, type, attributes); + _realDurableConfigurationStore.update(id, type, attributes); doPostDelay("update"); } @@ -216,7 +227,7 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore public void update(boolean createIfNecessary, ConfiguredObjectRecord... records) throws StoreException { doPreDelay("update"); - _durableConfigurationStore.update(createIfNecessary, records); + _realDurableConfigurationStore.update(createIfNecessary, records); doPostDelay("update"); } @@ -224,7 +235,7 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore public Transaction newTransaction() { doPreDelay("beginTran"); - Transaction txn = new SlowTransaction(_realStore.newTransaction()); + Transaction txn = new SlowTransaction(_realMessageStore.newTransaction()); doPostDelay("beginTran"); return txn; } @@ -232,7 +243,7 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore @Override public boolean isPersistent() { - return _realStore.isPersistent(); + return _realMessageStore.isPersistent(); } private class SlowTransaction implements Transaction @@ -299,28 +310,28 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore } @Override - public void activate() + public void recoverMessageStore(MessageStoreRecoveryHandler messageRecoveryHandler, TransactionLogRecoveryHandler transactionLogRecoveryHandler) { - _realStore.activate(); + _realMessageStore.recoverMessageStore(messageRecoveryHandler, transactionLogRecoveryHandler); } @Override public void addEventListener(EventListener eventListener, Event... events) { - if (_realStore == null) + if (_realMessageStore == null) { _eventListeners .put(eventListener, events); } else { - _realStore.addEventListener(eventListener, events); + _realMessageStore.addEventListener(eventListener, events); } } @Override public String getStoreLocation() { - return _realStore.getStoreLocation(); + return _realMessageStore.getStoreLocation(); } @Override @@ -332,7 +343,7 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore @Override public void onDelete() { - _realStore.onDelete(); + _realMessageStore.onDelete(); } } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SplitStoreTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SplitStoreTest.java new file mode 100644 index 0000000000..9f244e78a4 --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SplitStoreTest.java @@ -0,0 +1,130 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store; + +import java.io.File; +import java.util.HashMap; +import java.util.Map; + +import javax.jms.Connection; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.Queue; +import javax.jms.Session; + +import org.apache.qpid.configuration.ClientProperties; +import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.test.utils.QpidBrokerTestCase; +import org.apache.qpid.test.utils.TestBrokerConfiguration; +import org.apache.qpid.util.FileUtils; + +public class SplitStoreTest extends QpidBrokerTestCase +{ + private String _messageStorePath; + private String _configStorePath; + + @Override + protected void setUp() throws Exception + { + super.setUp(); + + String virtualHostWorkDir = System.getProperty("QPID_WORK") + File.separator + TestBrokerConfiguration.ENTRY_NAME_VIRTUAL_HOST + File.separator; + _messageStorePath = virtualHostWorkDir + "messageStore"; + _configStorePath = virtualHostWorkDir + "configStore"; + } + + @Override + public void startBroker() throws Exception + { + // Overridden to prevent QBTC starting the Broker. + } + + public void testJsonConfigurationStoreWithPersistentMessageStore() throws Exception + { + Map configurationStoreSettings = new HashMap(); + configurationStoreSettings.put(DurableConfigurationStore.STORE_TYPE, JsonFileConfigStore.TYPE); + configurationStoreSettings.put(DurableConfigurationStore.STORE_PATH, _configStorePath); + + doTest(configurationStoreSettings); + } + + public void testSeparateConfigurationAndMessageStoresOfTheSameType() throws Exception + { + Map configurationStoreSettings = new HashMap(); + configurationStoreSettings.put(DurableConfigurationStore.STORE_TYPE, getTestProfileMessageStoreType()); + configurationStoreSettings.put(DurableConfigurationStore.STORE_PATH, _configStorePath); + + doTest(configurationStoreSettings); + } + + private void configureAndStartBroker(Map configurationStoreSettings) throws Exception + { + Map messageStoreSettings = new HashMap(); + messageStoreSettings.put(MessageStore.STORE_TYPE, getTestProfileMessageStoreType()); + messageStoreSettings.put(MessageStore.STORE_PATH, _messageStorePath); + + TestBrokerConfiguration config = getBrokerConfiguration(); + config.setObjectAttribute(TestBrokerConfiguration.ENTRY_NAME_VIRTUAL_HOST, VirtualHost.MESSAGE_STORE_SETTINGS, messageStoreSettings); + config.setObjectAttribute(TestBrokerConfiguration.ENTRY_NAME_VIRTUAL_HOST, VirtualHost.CONFIGURATION_STORE_SETTINGS, configurationStoreSettings); + + super.startBroker(); + } + + private void doTest(Map configurationStoreSettings) throws Exception + { + configureAndStartBroker(configurationStoreSettings); + + Connection connection = getConnection(); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Queue queue = session.createQueue(getTestQueueName()); + session.createConsumer(queue).close(); // Create durable queue by side effect + sendMessage(session, queue, 2); + connection.close(); + + restartBroker(); + + setTestSystemProperty(ClientProperties.QPID_DECLARE_QUEUES_PROP_NAME, "false"); + connection = getConnection(); + connection.start(); + session = connection.createSession(true, Session.SESSION_TRANSACTED); + MessageConsumer consumer = session.createConsumer(queue); + Message message = consumer.receive(1000); + session.commit(); + + assertNotNull("Message was not received after first restart", message); + assertEquals("Unexpected message received after first restart", 0, message.getIntProperty(INDEX)); + + stopBroker(); + File messageStoreFile = new File(_messageStorePath); + FileUtils.delete(messageStoreFile, true); + assertFalse("Store folder was not deleted", messageStoreFile.exists()); + super.startBroker(); + + connection = getConnection(); + connection.start(); + session = connection.createSession(true, Session.SESSION_TRANSACTED); + consumer = session.createConsumer(queue); + message = consumer.receive(500); + + assertNull("Message was received after store removal", message); + } + +} diff --git a/qpid/java/test-profiles/CPPExcludes b/qpid/java/test-profiles/CPPExcludes index 6ce0936c08..6f7de94e5d 100755 --- a/qpid/java/test-profiles/CPPExcludes +++ b/qpid/java/test-profiles/CPPExcludes @@ -82,6 +82,7 @@ org.apache.qpid.server.AlertingTest#* // The C++ server has a totally different persistence mechanism org.apache.qpid.server.store.PersistentStoreTest#* +org.apache.qpid.server.store.SplitStoreTest#* // These tests are for the Java broker persistent store modules org.apache.qpid.server.store.MessageStoreTest#testMessagePersistence diff --git a/qpid/java/test-profiles/JavaTransientExcludes b/qpid/java/test-profiles/JavaTransientExcludes index 2f96584589..0b060051e9 100644 --- a/qpid/java/test-profiles/JavaTransientExcludes +++ b/qpid/java/test-profiles/JavaTransientExcludes @@ -20,6 +20,7 @@ //These tests require a persistent store org.apache.qpid.server.persistent.NoLocalAfterRecoveryTest#* org.apache.qpid.server.store.PersistentStoreTest#* +org.apache.qpid.server.store.SplitStoreTest#* org.apache.qpid.server.logging.AlertingTest#testAlertingReallyWorksWithRestart org.apache.qpid.server.logging.AlertingTest#testAlertingReallyWorksWithChanges org.apache.qpid.test.unit.ack.ClientAcknowledgeTest#testClientAckWithLargeFlusherPeriod -- cgit v1.2.1