diff options
author | Alex Rudyy <orudyy@apache.org> | 2014-03-31 15:37:34 +0000 |
---|---|---|
committer | Alex Rudyy <orudyy@apache.org> | 2014-03-31 15:37:34 +0000 |
commit | 6de63bce6e61011a921337d9c3880e199f21c94c (patch) | |
tree | 539f0c2471e185edb5eb2ed5a14d5c69b0ef42db | |
parent | 4e8e65d2a2569aee45f20a09f4f34c6abcf81f59 (diff) | |
download | qpid-python-6de63bce6e61011a921337d9c3880e199f21c94c.tar.gz |
QPID-5653: Open databases and upgrade on opening of configuration/message stores
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-bdb-ha2@1583351 13f79535-47bb-0310-9956-ffa450edef68
27 files changed, 290 insertions, 175 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfiguredObjectRecord.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfiguredObjectRecord.java index f13e4dd08b..a5eac25968 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfiguredObjectRecord.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfiguredObjectRecord.java @@ -104,4 +104,11 @@ public class BDBConfiguredObjectRecord implements ConfiguredObjectRecord result = 31 * result + (_type != null ? _type.hashCode() : 0); return result; } + + @Override + public String toString() + { + return "BDBConfiguredObjectRecord [id=" + _id + ", type=" + _type + ", name=" + (_attributes == null ? null : _attributes.get("name")) + ", parents=" + _parents + "]"; + } + } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java index 4fe41d512a..aae0a56a40 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java @@ -1,4 +1,3 @@ -package org.apache.qpid.server.store.berkeleydb; /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -19,7 +18,9 @@ package org.apache.qpid.server.store.berkeleydb; * under the License. * */ +package org.apache.qpid.server.store.berkeleydb; +import java.util.HashMap; import java.util.Map; import org.apache.log4j.Logger; @@ -65,10 +66,11 @@ public class BDBHAVirtualHost extends AbstractVirtualHost _messageStore = new BDBMessageStore(new ReplicatedEnvironmentFacadeFactory()); getEventLogger().message(_messageStoreLogSubject, MessageStoreMessages.CREATED()); - Map<String, Object> messageStoreSettings = virtualHost.getMessageStoreSettings(); + Map<String, Object> messageStoreSettings = new HashMap<String, Object>(virtualHost.getMessageStoreSettings()); + messageStoreSettings.put(DurableConfigurationStore.IS_MESSAGE_STORE_TOO, true); - _messageStore.openConfigurationStore(virtualHost.getName(), messageStoreSettings); - _messageStore.openMessageStore(virtualHost.getName(), messageStoreSettings); + _messageStore.openConfigurationStore(virtualHost, messageStoreSettings); + _messageStore.openMessageStore(virtualHost, messageStoreSettings); getEventLogger().message(_messageStoreLogSubject, MessageStoreMessages.STORE_LOCATION(_messageStore.getStoreLocation())); @@ -96,15 +98,17 @@ public class BDBHAVirtualHost extends AbstractVirtualHost { _messageStore.getEnvironmentFacade().getEnvironment().flushLog(true); + DefaultUpgraderProvider upgraderProvider = new DefaultUpgraderProvider(this); + DurableConfigurationRecoverer configRecoverer = new DurableConfigurationRecoverer(getName(), getDurableConfigurationRecoverers(), - new DefaultUpgraderProvider(BDBHAVirtualHost.this, getExchangeRegistry()), getEventLogger()); - _messageStore.recoverConfigurationStore(getModel(), configRecoverer); + upgraderProvider, getEventLogger()); + _messageStore.recoverConfigurationStore(configRecoverer); initialiseModel(); VirtualHostConfigRecoveryHandler recoveryHandler = new VirtualHostConfigRecoveryHandler(BDBHAVirtualHost.this, getMessageStoreLogSubject()); - _messageStore.recoverMessageStore(getModel(), recoveryHandler, recoveryHandler); + _messageStore.recoverMessageStore(recoveryHandler, recoveryHandler); attainActivation(); } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java index 9f8e5410a7..8aac9a6247 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java @@ -56,6 +56,7 @@ import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.store.TransactionLogRecoveryHandler; import org.apache.qpid.server.store.TransactionLogRecoveryHandler.QueueEntryRecoveryHandler; import org.apache.qpid.server.store.TransactionLogResource; +import org.apache.qpid.server.store.berkeleydb.EnvironmentFacadeFactory.EnvironmentFacadeTask; import org.apache.qpid.server.store.berkeleydb.entry.HierarchyKey; import org.apache.qpid.server.store.berkeleydb.entry.PreparedTransaction; import org.apache.qpid.server.store.berkeleydb.entry.QueueEntryKey; @@ -69,6 +70,7 @@ import org.apache.qpid.server.store.berkeleydb.tuple.QueueEntryBinding; import org.apache.qpid.server.store.berkeleydb.tuple.UUIDTupleBinding; import org.apache.qpid.server.store.berkeleydb.tuple.XidBinding; import org.apache.qpid.server.store.berkeleydb.upgrade.Upgrader; +import org.apache.qpid.server.util.MapValueConverter; import org.apache.qpid.util.FileUtils; import com.sleepycat.bind.tuple.ByteBinding; @@ -152,39 +154,43 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore } @Override - public void openConfigurationStore(String virtualHostName, Map<String, Object> storeSettings) + public void openConfigurationStore(ConfiguredObject<?> parent, Map<String, Object> storeSettings) { if (_configurationStoreOpen.compareAndSet(false, true)) { if (_environmentFacade == null) { - _environmentFacade = _environmentFacadeFactory.createEnvironmentFacade(virtualHostName, storeSettings); + String[] databaseNames = null; + if (MapValueConverter.getBooleanAttribute(IS_MESSAGE_STORE_TOO, storeSettings, false)) + { + databaseNames = new String[CONFIGURATION_STORE_DATABASE_NAMES.length + MESSAGE_STORE_DATABASE_NAMES.length]; + System.arraycopy(CONFIGURATION_STORE_DATABASE_NAMES, 0, databaseNames, 0, CONFIGURATION_STORE_DATABASE_NAMES.length); + System.arraycopy(MESSAGE_STORE_DATABASE_NAMES, 0, databaseNames, CONFIGURATION_STORE_DATABASE_NAMES.length, MESSAGE_STORE_DATABASE_NAMES.length); + } + else + { + databaseNames = CONFIGURATION_STORE_DATABASE_NAMES; + } + _environmentFacade = _environmentFacadeFactory.createEnvironmentFacade(storeSettings, new UpgradeTask(parent), new OpenDatabasesTask(databaseNames)); + } + else + { + throw new IllegalStateException("The database have been already opened as message store"); } } } @Override - public void recoverConfigurationStore(ConfiguredObject<?> parent, ConfigurationRecoveryHandler recoveryHandler) + public void recoverConfigurationStore(ConfigurationRecoveryHandler recoveryHandler) { checkConfigurationStoreOpen(); - DatabaseConfig dbConfig = new DatabaseConfig(); - dbConfig.setTransactional(true); - dbConfig.setAllowCreate(true); - try - { - new Upgrader(_environmentFacade.getEnvironment(), parent).upgradeIfNecessary(); - _environmentFacade.openDatabases(dbConfig, CONFIGURATION_STORE_DATABASE_NAMES); - } - catch(DatabaseException e) - { - throw _environmentFacade.handleDatabaseException("Cannot configure store", e); - } + recoverConfig(recoveryHandler); } @Override - public void openMessageStore(String virtualHostName, Map<String, Object> messageStoreSettings) throws StoreException + public void openMessageStore(ConfiguredObject<?> parent, Map<String, Object> messageStoreSettings) throws StoreException { if (_messageStoreOpen.compareAndSet(false, true)) { @@ -204,34 +210,19 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore if (_environmentFacade == null) { - _environmentFacade = _environmentFacadeFactory.createEnvironmentFacade(virtualHostName, messageStoreSettings); + _environmentFacade = _environmentFacadeFactory.createEnvironmentFacade(messageStoreSettings, new UpgradeTask(parent), new OpenDatabasesTask(MESSAGE_STORE_DATABASE_NAMES), new DiskSpaceTask()); } - _committer = _environmentFacade.createCommitter(virtualHostName); + _committer = _environmentFacade.createCommitter(parent.getName()); _committer.start(); - } } @Override - public synchronized void recoverMessageStore(ConfiguredObject<?> parent, MessageStoreRecoveryHandler messageRecoveryHandler, TransactionLogRecoveryHandler transactionLogRecoveryHandler) throws StoreException + public synchronized void recoverMessageStore(MessageStoreRecoveryHandler messageRecoveryHandler, TransactionLogRecoveryHandler transactionLogRecoveryHandler) throws StoreException { checkMessageStoreOpen(); - DatabaseConfig dbConfig = new DatabaseConfig(); - dbConfig.setTransactional(true); - dbConfig.setAllowCreate(true); - try - { - new Upgrader(_environmentFacade.getEnvironment(), parent).upgradeIfNecessary(); - _environmentFacade.openDatabases(dbConfig, MESSAGE_STORE_DATABASE_NAMES); - _totalStoreSize = getSizeOnDisk(); - } - catch(DatabaseException e) - { - throw _environmentFacade.handleDatabaseException("Cannot upgrade message store or open datatbases", e); - } - if(messageRecoveryHandler != null) { recoverMessages(messageRecoveryHandler); @@ -1843,4 +1834,72 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore return _environmentFacade.getOpenDatabase(XID_DB_NAME); } + class UpgradeTask implements EnvironmentFacadeTask + { + + private ConfiguredObject<?> _parent; + + public UpgradeTask(ConfiguredObject<?> parent) + { + _parent = parent; + } + + @Override + public void execute(EnvironmentFacade facade) + { + try + { + new Upgrader(facade.getEnvironment(), _parent).upgradeIfNecessary(); + } + catch(DatabaseException e) + { + throw facade.handleDatabaseException("Cannot upgrade store", e); + } + } + } + + class OpenDatabasesTask implements EnvironmentFacadeTask + { + private String[] _names; + + public OpenDatabasesTask(String[] names) + { + _names = names; + } + + @Override + public void execute(EnvironmentFacade facade) + { + try + { + DatabaseConfig dbConfig = new DatabaseConfig(); + dbConfig.setTransactional(true); + dbConfig.setAllowCreate(true); + facade.openDatabases(dbConfig, _names); + } + catch(DatabaseException e) + { + throw facade.handleDatabaseException("Cannot open databases", e); + } + } + + } + + class DiskSpaceTask implements EnvironmentFacadeTask + { + + @Override + public void execute(EnvironmentFacade facade) + { + try + { + _totalStoreSize = facade.getEnvironment().getStats(null).getTotalLogSize(); + } + catch(DatabaseException e) + { + throw facade.handleDatabaseException("Cannot evaluate disk store size", e); + } + } + + } } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeFactory.java index fd064d9b0e..2e02a6cfed 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeFactory.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeFactory.java @@ -26,8 +26,12 @@ public interface EnvironmentFacadeFactory { public static final String ENVIRONMENT_CONFIGURATION = "bdbEnvironmentConfig"; - EnvironmentFacade createEnvironmentFacade(String virtualHostName, Map<String, Object> storeSettings); + EnvironmentFacade createEnvironmentFacade(Map<String, Object> storeSettings, EnvironmentFacadeTask... initialisationTasks); String getType(); + public static interface EnvironmentFacadeTask + { + void execute(EnvironmentFacade facade); + } } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java index 8117ca1a9a..6065be5fa9 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java @@ -25,6 +25,7 @@ import java.util.HashMap; import java.util.Map; import org.apache.log4j.Logger; +import org.apache.qpid.server.store.berkeleydb.EnvironmentFacadeFactory.EnvironmentFacadeTask; import com.sleepycat.je.Database; import com.sleepycat.je.DatabaseConfig; @@ -42,7 +43,7 @@ public class StandardEnvironmentFacade implements EnvironmentFacade private Environment _environment; - public StandardEnvironmentFacade(String storePath, Map<String, String> attributes) + public StandardEnvironmentFacade(String storePath, Map<String, String> attributes, EnvironmentFacadeTask[] initialisationTasks) { _storePath = storePath; @@ -74,6 +75,13 @@ public class StandardEnvironmentFacade implements EnvironmentFacade envConfig.setExceptionListener(new LoggingAsyncExceptionListener()); _environment = new Environment(environmentPath, envConfig); + if (initialisationTasks != null) + { + for (EnvironmentFacadeTask task : initialisationTasks) + { + task.execute(this); + } + } } @Override diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java index cc38b799a6..9506b1c20a 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java @@ -20,11 +20,9 @@ */ package org.apache.qpid.server.store.berkeleydb; -import java.io.File; import java.util.HashMap; import java.util.Map; -import org.apache.qpid.server.configuration.BrokerProperties; import org.apache.qpid.server.store.MessageStore; public class StandardEnvironmentFacadeFactory implements EnvironmentFacadeFactory @@ -32,25 +30,18 @@ public class StandardEnvironmentFacadeFactory implements EnvironmentFacadeFactor @SuppressWarnings("unchecked") @Override - public EnvironmentFacade createEnvironmentFacade(String virtualHostName, Map<String, Object> messageStoreSettings) + public EnvironmentFacade createEnvironmentFacade(Map<String, Object> messageStoreSettings, EnvironmentFacadeTask... initialisationTasks) { Map<String, String> envConfigMap = new HashMap<String, String>(); envConfigMap.putAll(EnvironmentFacade.ENVCONFIG_DEFAULTS); - final String defaultPath = System.getProperty(BrokerProperties.PROPERTY_QPID_WORK) + File.separator + "bdbstore" + File.separator + virtualHostName; - Object environmentConfigurationAttributes = messageStoreSettings.get(ENVIRONMENT_CONFIGURATION); if (environmentConfigurationAttributes instanceof Map) { envConfigMap.putAll((Map<String, String>) environmentConfigurationAttributes); } String storeLocation = (String) messageStoreSettings.get(MessageStore.STORE_PATH); - if(storeLocation == null) - { - storeLocation = defaultPath; - } - - return new StandardEnvironmentFacade(storeLocation, envConfigMap); + return new StandardEnvironmentFacade(storeLocation, envConfigMap, initialisationTasks); } @Override diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java index 3e15e9bdcc..b8192ea741 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java @@ -40,12 +40,14 @@ import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import org.apache.log4j.Logger; import org.apache.qpid.server.store.berkeleydb.CoalescingCommiter; import org.apache.qpid.server.store.berkeleydb.Committer; import org.apache.qpid.server.store.berkeleydb.EnvironmentFacade; +import org.apache.qpid.server.store.berkeleydb.EnvironmentFacadeFactory.EnvironmentFacadeTask; import org.apache.qpid.server.store.berkeleydb.LoggingAsyncExceptionListener; import org.apache.qpid.server.util.DaemonThreadFactory; @@ -110,7 +112,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan */ put(ReplicationConfig.ENV_SETUP_TIMEOUT, "15 min"); /** - * Parameter changed from default (off) to allow the Environment to start in the + * Parameter changed from default (off) to allow the Environment to start in the * UNKNOWN state when the majority is not available. */ put(ReplicationConfig.ENV_UNKNOWN_STATE_TIMEOUT, "5 s"); @@ -148,7 +150,10 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan private volatile long _joinTime; private volatile ReplicatedEnvironment.State _lastKnownEnvironmentState; - public ReplicatedEnvironmentFacade(ReplicatedEnvironmentConfiguration configuration) + private AtomicBoolean _initialised; + private EnvironmentFacadeTask[] _initialisationTasks; + + public ReplicatedEnvironmentFacade(ReplicatedEnvironmentConfiguration configuration, EnvironmentFacadeTask[] initialisationTasks) { _environmentDirectory = new File(configuration.getStorePath()); if (!_environmentDirectory.exists()) @@ -160,6 +165,8 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan } } + _initialised = new AtomicBoolean(); + _initialisationTasks = initialisationTasks; _configuration = configuration; _durability = Durability.parse(_configuration.getDurability()); @@ -393,9 +400,10 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan LOGGER.info("The environment facade is in open state for node " + _prettyGroupNodeName); _joinTime = System.currentTimeMillis(); } + if (state == ReplicatedEnvironment.State.MASTER) { - reopenDatabases(); + onMasterStateChange(); } } @@ -413,6 +421,22 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan _lastKnownEnvironmentState = state; } + private void onMasterStateChange() + { + reopenDatabases(); + + if (_initialised.compareAndSet(false, true)) + { + if (_initialisationTasks != null) + { + for (EnvironmentFacadeTask task : _initialisationTasks) + { + task.execute(ReplicatedEnvironmentFacade.this); + } + } + } + } + private void reopenDatabases() { if (_state.get() == State.OPEN) @@ -992,7 +1016,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan { nodeState = ReplicatedEnvironment.State.UNKNOWN; } - + currentGroupState.put(node.getName(), nodeState); return null; } @@ -1079,5 +1103,4 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan } } - } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java index c6b3e48cf8..8216cfc484 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java @@ -47,7 +47,7 @@ public class ReplicatedEnvironmentFacadeFactory implements EnvironmentFacadeFact private static final boolean DEFAULT_COALESCING_SYNC = true; @Override - public EnvironmentFacade createEnvironmentFacade(String virtualHostName, final Map<String, Object> messageStoreSettings) + public EnvironmentFacade createEnvironmentFacade(final Map<String, Object> messageStoreSettings, EnvironmentFacadeTask... initialisationTasks) { ReplicatedEnvironmentConfiguration configuration = new ReplicatedEnvironmentConfiguration() { @@ -126,7 +126,7 @@ public class ReplicatedEnvironmentFacadeFactory implements EnvironmentFacadeFact return durability == null ? DEFAULT_DURABILITY.toString() : durability; } }; - return new ReplicatedEnvironmentFacade(configuration); + return new ReplicatedEnvironmentFacade(configuration, initialisationTasks); } diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java index b19e18b204..a82bb066e2 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java @@ -122,7 +122,7 @@ public class StandardEnvironmentFacadeTest extends QpidTestCase EnvironmentFacade createEnvironmentFacade() { - return new StandardEnvironmentFacade(_storePath.getAbsolutePath(), Collections.<String, String>emptyMap()); + return new StandardEnvironmentFacade(_storePath.getAbsolutePath(), Collections.<String, String>emptyMap(), null); } } diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java index cd7dd69c46..b342493c59 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java @@ -31,8 +31,6 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.qpid.server.configuration.updater.TaskExecutor; -import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.store.berkeleydb.EnvironmentFacade; import org.apache.qpid.test.utils.QpidTestCase; import org.apache.qpid.test.utils.TestFileUtils; @@ -65,16 +63,11 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase private File _storePath; private final Map<String, ReplicatedEnvironmentFacade> _nodes = new HashMap<String, ReplicatedEnvironmentFacade>(); - private VirtualHost _virtualHost = mock(VirtualHost.class); public void setUp() throws Exception { super.setUp(); - TaskExecutor taskExecutor = mock(TaskExecutor.class); - when(taskExecutor.isTaskExecutorThread()).thenReturn(true); - when(_virtualHost.getTaskExecutor()).thenReturn(taskExecutor); - _storePath = TestFileUtils.createTestDirectory("bdb", true); setTestSystemProperty(ReplicatedEnvironmentFacade.DB_PING_SOCKET_TIMEOUT_PROPERTY_NAME, "100"); @@ -302,7 +295,7 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase State desiredState, StateChangeListener stateChangeListener) { ReplicatedEnvironmentConfiguration config = createReplicatedEnvironmentConfiguration(nodeName, nodeHostPort, designatedPrimary); - ReplicatedEnvironmentFacade ref = new ReplicatedEnvironmentFacade(config); + ReplicatedEnvironmentFacade ref = new ReplicatedEnvironmentFacade(config, null); ref.setStateChangeListener(stateChangeListener); _nodes.put(nodeName, ref); return ref; diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java index ba84d0682a..465c49e0c4 100644 --- a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java +++ b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java @@ -242,9 +242,9 @@ public class BDBMessageStoreTest extends MessageStoreTest MessageStoreRecoveryHandler recoveryHandler = mock(MessageStoreRecoveryHandler.class); when(recoveryHandler.begin()).thenReturn(mock(StoredMessageRecoveryHandler.class)); VirtualHost<?> virtualHost = getVirtualHostModel(); - newStore.openMessageStore(virtualHost.getName(), virtualHost.getMessageStoreSettings()); + newStore.openMessageStore(virtualHost, virtualHost.getMessageStoreSettings()); - newStore.recoverMessageStore(getVirtualHostModel(), recoveryHandler, null); + newStore.recoverMessageStore(recoveryHandler, null); return newStore; } diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java index 2e7bccf8f3..921eb916ea 100644 --- a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java +++ b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java @@ -362,7 +362,7 @@ public class BDBUpgradeTest extends QpidBrokerTestCase /** * - * TODO Raise Jira and resolve so this test can be reenabled. + * TODO (QPID-5650) Resolve so this test can be reenabled. * * Test that the queue configured to have a DLQ was recovered and has the alternate exchange * and max delivery count, the DLE exists, the DLQ exists with no max delivery count, the diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java index 28ac79075e..e7b6adaf7a 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java @@ -183,11 +183,20 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC @Override - public void openConfigurationStore(String virtualHostName, Map<String, Object> storeSettings) + public void openConfigurationStore(ConfiguredObject<?> parent, Map<String, Object> storeSettings) { if (_configurationStoreOpen.compareAndSet(false, true)) { - initialiseIfNecessary(virtualHostName, storeSettings); + initialiseIfNecessary(parent.getName(), storeSettings); + try + { + createOrOpenConfigurationStoreDatabase(); + upgradeIfVersionTableExists(parent); + } + catch(SQLException e) + { + throw new StoreException("Cannot create databases or upgrade", e); + } } } @@ -212,14 +221,12 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC } @Override - public void recoverConfigurationStore(ConfiguredObject<?> parent, ConfigurationRecoveryHandler recoveryHandler) + public void recoverConfigurationStore(ConfigurationRecoveryHandler recoveryHandler) { checkConfigurationStoreOpen(); try { - createOrOpenConfigurationStoreDatabase(); - upgradeIfVersionTableExists(parent); recoveryHandler.beginConfigurationRecovery(this, getConfigVersion()); loadConfiguredObjects(recoveryHandler); setConfigVersion(recoveryHandler.completeConfigurationRecovery()); @@ -266,27 +273,28 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC } @Override - public void openMessageStore(String virtualHostName, Map<String, Object> messageStoreSettings) + public void openMessageStore(ConfiguredObject<?> parent, Map<String, Object> messageStoreSettings) { if (_messageStoreOpen.compareAndSet(false, true)) { - initialiseIfNecessary(virtualHostName, messageStoreSettings); + initialiseIfNecessary(parent.getName(), messageStoreSettings); + try + { + createOrOpenMessageStoreDatabase(); + upgradeIfNecessary(parent); + } + catch (SQLException e) + { + throw new StoreException("Unable to activate message store ", e); + } } } @Override - public void recoverMessageStore(ConfiguredObject<?> parent, MessageStoreRecoveryHandler messageRecoveryHandler, TransactionLogRecoveryHandler transactionLogRecoveryHandler) + public void recoverMessageStore(MessageStoreRecoveryHandler messageRecoveryHandler, TransactionLogRecoveryHandler transactionLogRecoveryHandler) { checkMessageStoreOpen(); - try - { - createOrOpenMessageStoreDatabase(); - upgradeIfNecessary(parent); - } - catch (SQLException e) - { - throw new StoreException("Unable to activate message store ", e); - } + if(messageRecoveryHandler != null) { try diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationRecoverer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationRecoverer.java index 84f24df1cc..5975bf58b3 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationRecoverer.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationRecoverer.java @@ -32,7 +32,6 @@ import org.apache.log4j.Logger; import org.apache.qpid.server.configuration.IllegalConfigurationException; import org.apache.qpid.server.logging.EventLogger; import org.apache.qpid.server.logging.messages.ConfigStoreMessages; -import org.apache.qpid.server.logging.messages.MessageStoreMessages; import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject; import static org.apache.qpid.server.model.VirtualHost.CURRENT_CONFIG_VERSION; diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java index 918a9b0134..624c451df6 100755 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java @@ -29,6 +29,8 @@ public interface DurableConfigurationStore { String STORE_TYPE = "storeType"; String STORE_PATH = "storePath"; + String IS_MESSAGE_STORE_TOO = "isMessageStoreToo"; + public static interface Source { @@ -38,17 +40,16 @@ public interface DurableConfigurationStore /** * Called after instantiation in order to configure the message store. A particular implementation can define * whatever parameters it wants. - * @param virtualHostName host name + * @param parent host name * @param storeSettings store settings */ - void openConfigurationStore(String virtualHostName, Map<String, Object> storeSettings) throws StoreException; + void openConfigurationStore(ConfiguredObject<?> parent, Map<String, Object> storeSettings) throws StoreException; /** * Recovers configuration from the store using given recovery handler - * @param parent parent * @param recoveryHandler recovery handler */ - void recoverConfigurationStore(ConfiguredObject<?> parent, ConfigurationRecoveryHandler recoveryHandler) throws StoreException; + void recoverConfigurationStore(ConfigurationRecoveryHandler recoveryHandler) throws StoreException; /** * Makes the specified object persistent. @@ -81,6 +82,6 @@ public interface DurableConfigurationStore */ void update(boolean createIfNecessary, ConfiguredObjectRecord... records) throws StoreException; - void closeConfigurationStore() throws StoreException; + } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java index 97559c8286..0eca9adda0 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java @@ -89,16 +89,16 @@ public class JsonFileConfigStore implements DurableConfigurationStore } @Override - public void openConfigurationStore(String virtualHostName, Map<String, Object> storeSettings) + public void openConfigurationStore(ConfiguredObject<?> parent, Map<String, Object> storeSettings) { - _name = virtualHostName; + _name = parent.getName(); setup(storeSettings); load(); } @Override - public void recoverConfigurationStore(ConfiguredObject<?> parent, ConfigurationRecoveryHandler recoveryHandler) + public void recoverConfigurationStore(ConfigurationRecoveryHandler recoveryHandler) { recoveryHandler.beginConfigurationRecovery(this,_configVersion); List<ConfiguredObjectRecord> records = new ArrayList<ConfiguredObjectRecord>(_objectsById.values()); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java index b6b65087a4..69f9073f6e 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java @@ -38,18 +38,17 @@ public interface MessageStore /** * Called after instantiation in order to open and initialize the message store. A particular implementation can define * whatever parameters it wants. - * @param virtualHostName virtual host name + * @param parent virtual host name * @param messageStoreSettings store settings */ - void openMessageStore(String virtualHostName, Map<String, Object> messageStoreSettings); + void openMessageStore(ConfiguredObject<?> parent, Map<String, Object> messageStoreSettings); /** * Called after opening to recover messages and transactions with given recovery handlers - * @param parent TODO * @param messageRecoveryHandler * @param transactionLogRecoveryHandler */ - void recoverMessageStore(ConfiguredObject<?> parent, MessageStoreRecoveryHandler messageRecoveryHandler, TransactionLogRecoveryHandler transactionLogRecoveryHandler); + void recoverMessageStore(MessageStoreRecoveryHandler messageRecoveryHandler, TransactionLogRecoveryHandler transactionLogRecoveryHandler); public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(T metaData); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java index 66c3fe6cae..59b4530014 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java @@ -26,13 +26,14 @@ import org.apache.qpid.server.model.ConfiguredObject; public abstract class NullMessageStore implements MessageStore, DurableConfigurationStore { + @Override - public void openConfigurationStore(String virtualHostName, Map<String, Object> storeSettings) + public void openConfigurationStore(ConfiguredObject<?> parent, Map<String, Object> storeSettings) { } @Override - public void recoverConfigurationStore(ConfiguredObject<?> parent, ConfigurationRecoveryHandler recoveryHandler) + public void recoverConfigurationStore(ConfigurationRecoveryHandler recoveryHandler) { } @@ -58,7 +59,7 @@ public abstract class NullMessageStore implements MessageStore, DurableConfigura } @Override - public void openMessageStore(String virtualHostName, Map<String, Object> messageStoreSettings) + public void openMessageStore(ConfiguredObject<?> parent, Map<String, Object> messageStoreSettings) { } @@ -91,7 +92,7 @@ public abstract class NullMessageStore implements MessageStore, DurableConfigura } @Override - public void recoverMessageStore(ConfiguredObject<?> parent, MessageStoreRecoveryHandler messageRecoveryHandler, TransactionLogRecoveryHandler transactionLogRecoveryHandler) + public void recoverMessageStore(MessageStoreRecoveryHandler messageRecoveryHandler, TransactionLogRecoveryHandler transactionLogRecoveryHandler) { } @@ -110,4 +111,5 @@ public abstract class NullMessageStore implements MessageStore, DurableConfigura public void onDelete() { } + } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java index f6fa6344d3..7e0562afec 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java @@ -30,7 +30,6 @@ import java.util.Map.Entry; import java.util.UUID; import org.apache.log4j.Logger; -import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.exchange.TopicExchange; import org.apache.qpid.server.filter.FilterSupport; import org.apache.qpid.server.model.Binding; @@ -52,7 +51,6 @@ public class DefaultUpgraderProvider implements UpgraderProvider public static final String EXCLUSIVE = "exclusive"; public static final String NAME = "name"; - private final ExchangeRegistry _exchangeRegistry; private final VirtualHost _virtualHost; @SuppressWarnings("serial") @@ -66,11 +64,9 @@ public class DefaultUpgraderProvider implements UpgraderProvider private final Map<String, UUID> _defaultExchangeIds; - public DefaultUpgraderProvider(final VirtualHost virtualHost, - final ExchangeRegistry exchangeRegistry) + public DefaultUpgraderProvider(final VirtualHost virtualHost) { _virtualHost = virtualHost; - _exchangeRegistry = exchangeRegistry; Map<String, UUID> defaultExchangeIds = new HashMap<String, UUID>(); for (String exchangeName : DEFAULT_EXCHANGES.keySet()) { @@ -150,7 +146,12 @@ public class DefaultUpgraderProvider implements UpgraderProvider private boolean isTopicExchange(ConfiguredObjectRecord entry) { - UUID exchangeId = entry.getParents().get("Exchange").getId(); + ConfiguredObjectRecord exchangeRecord = entry.getParents().get("Exchange"); + if (exchangeRecord == null) + { + return false; + } + UUID exchangeId = exchangeRecord.getId(); if(_records.containsKey(exchangeId)) { @@ -165,8 +166,8 @@ public class DefaultUpgraderProvider implements UpgraderProvider return true; } - return _exchangeRegistry.getExchange(exchangeId) != null - && _exchangeRegistry.getExchange(exchangeId).getExchangeType() == TopicExchange.TYPE; + return _virtualHost.getExchange(exchangeId) != null + && _virtualHost.getExchange(exchangeId).getExchangeType() == TopicExchange.TYPE; } } @@ -253,7 +254,7 @@ public class DefaultUpgraderProvider implements UpgraderProvider } ConfiguredObjectRecord localRecord = getUpdateMap().get(exchangeId); return !((localRecord != null && localRecord.getType().equals(Exchange.class.getSimpleName())) - || _exchangeRegistry.getExchange(exchangeId) != null); + || _virtualHost.getExchange(exchangeId) != null); } private boolean unknownQueue(final UUID queueId) diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java index 6b75c39c49..e3fd938225 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java @@ -1,4 +1,4 @@ -package org.apache.qpid.server.virtualhost;/* +/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -18,7 +18,9 @@ package org.apache.qpid.server.virtualhost;/* * under the License. * */ +package org.apache.qpid.server.virtualhost; +import java.util.HashMap; import java.util.Map; import org.apache.qpid.server.logging.messages.ConfigStoreMessages; @@ -27,6 +29,7 @@ import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.plugin.MessageStoreFactory; import org.apache.qpid.server.stats.StatisticsGatherer; + import org.apache.qpid.server.store.DurableConfigurationRecoverer; import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.DurableConfigurationStoreCreator; @@ -83,18 +86,21 @@ public class StandardVirtualHost extends AbstractVirtualHost String configurationStoreType = configurationStoreSettings == null ? null : (String) configurationStoreSettings.get(DurableConfigurationStore.STORE_TYPE); _durableConfigurationStore = initialiseConfigurationStore(configurationStoreType); boolean combinedStores = _durableConfigurationStore == _messageStore; + if (combinedStores) + { + configurationStoreSettings = new HashMap<String,Object>(messageStoreSettings); + configurationStoreSettings.put(DurableConfigurationStore.IS_MESSAGE_STORE_TOO, true); + } + if (!combinedStores) { _configurationStoreLogSubject = new MessageStoreLogSubject(getName(), _durableConfigurationStore.getClass().getSimpleName()); getEventLogger().message(_configurationStoreLogSubject, ConfigStoreMessages.CREATED()); } - DurableConfigurationRecoverer configRecoverer = - new DurableConfigurationRecoverer(getName(), getDurableConfigurationRecoverers(), - new DefaultUpgraderProvider(this, getExchangeRegistry()), getEventLogger()); - _durableConfigurationStore.openConfigurationStore(virtualHost.getName(), combinedStores ? messageStoreSettings: configurationStoreSettings); + _durableConfigurationStore.openConfigurationStore(virtualHost, configurationStoreSettings); - _messageStore.openMessageStore(virtualHost.getName(), virtualHost.getMessageStoreSettings()); + _messageStore.openMessageStore(virtualHost, virtualHost.getMessageStoreSettings()); getEventLogger().message(_messageStoreLogSubject, MessageStoreMessages.STORE_LOCATION(_messageStore.getStoreLocation())); @@ -103,13 +109,16 @@ public class StandardVirtualHost extends AbstractVirtualHost getEventLogger().message(_configurationStoreLogSubject, ConfigStoreMessages.STORE_LOCATION(configurationStoreSettings.toString())); } - _durableConfigurationStore.recoverConfigurationStore(getModel(), configRecoverer); + DurableConfigurationRecoverer configRecoverer = new DurableConfigurationRecoverer(getName(), getDurableConfigurationRecoverers(), + new DefaultUpgraderProvider(this), getEventLogger()); + + _durableConfigurationStore.recoverConfigurationStore(configRecoverer); // If store does not have entries for standard exchanges (amq.*), the following will create them. initialiseModel(); VirtualHostConfigRecoveryHandler recoveryHandler = new VirtualHostConfigRecoveryHandler(this, getMessageStoreLogSubject()); - _messageStore.recoverMessageStore(getModel(), recoveryHandler, recoveryHandler); + _messageStore.recoverMessageStore(recoveryHandler, recoveryHandler); attainActivation(); } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java index fbd26208d8..83052110a1 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java @@ -488,8 +488,10 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest closeConfigStore(); _configStore = createConfigStore(); - _configStore.openConfigurationStore("testName", _configurationStoreSettings); - _configStore.recoverConfigurationStore(mock(ConfiguredObject.class), _recoveryHandler); + ConfiguredObject<?> parent = mock(ConfiguredObject.class); + when(parent.getName()).thenReturn("testName"); + _configStore.openConfigurationStore(parent, _configurationStoreSettings); + _configStore.recoverConfigurationStore(_recoveryHandler); } protected abstract DurableConfigurationStore createConfigStore() throws Exception; diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreTest.java index bca16b6e70..1de24e371d 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreTest.java @@ -26,7 +26,6 @@ import java.util.HashMap; import java.util.Map; import java.util.UUID; -import org.apache.qpid.server.model.Binding; import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.util.ServerScopedRuntimeException; @@ -37,8 +36,6 @@ import org.mockito.ArgumentMatcher; import org.mockito.InOrder; import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyMap; -import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.argThat; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.inOrder; @@ -53,7 +50,7 @@ public class JsonFileConfigStoreTest extends QpidTestCase private JsonFileConfigStore _store; private HashMap<String, Object> _configurationStoreSettings; - private String _virtualHostName; + private ConfiguredObject<?> _virtualHost; private File _storeLocation; @@ -64,7 +61,9 @@ public class JsonFileConfigStoreTest extends QpidTestCase public void setUp() throws Exception { super.setUp(); - _virtualHostName = getName(); + + _virtualHost = mock(ConfiguredObject.class); + when(_virtualHost.getName()).thenReturn(getName()); _storeLocation = TestFileUtils.createTestDirectory("json", true); _configurationStoreSettings = new HashMap<String, Object>(); _configurationStoreSettings.put(JsonFileConfigStore.STORE_TYPE, JsonFileConfigStore.TYPE); @@ -90,7 +89,7 @@ public class JsonFileConfigStoreTest extends QpidTestCase _configurationStoreSettings.put(JsonFileConfigStore.STORE_PATH, null); try { - _store.openConfigurationStore(_virtualHostName, _configurationStoreSettings); + _store.openConfigurationStore(_virtualHost, _configurationStoreSettings); fail("Store should not successfully configure if there is no path set"); } catch (ServerScopedRuntimeException e) @@ -105,7 +104,7 @@ public class JsonFileConfigStoreTest extends QpidTestCase _configurationStoreSettings.put(JsonFileConfigStore.STORE_PATH, System.getProperty("file.separator")); try { - _store.openConfigurationStore(_virtualHostName, _configurationStoreSettings); + _store.openConfigurationStore(_virtualHost, _configurationStoreSettings); fail("Store should not successfully configure if there is an invalid path set"); } catch (ServerScopedRuntimeException e) @@ -116,8 +115,8 @@ public class JsonFileConfigStoreTest extends QpidTestCase public void testStartFromNoStore() throws Exception { - _store.openConfigurationStore(_virtualHostName, _configurationStoreSettings); - _store.recoverConfigurationStore(mock(ConfiguredObject.class), _recoveryHandler); + _store.openConfigurationStore(_virtualHost, _configurationStoreSettings); + _store.recoverConfigurationStore(_recoveryHandler); InOrder inorder = inOrder(_recoveryHandler); inorder.verify(_recoveryHandler).beginConfigurationRecovery(eq(_store), eq(0)); inorder.verify(_recoveryHandler,never()).configuredObject(any(ConfiguredObjectRecord.class)); @@ -130,12 +129,12 @@ public class JsonFileConfigStoreTest extends QpidTestCase final int NEW_CONFIG_VERSION = 42; when(_recoveryHandler.completeConfigurationRecovery()).thenReturn(NEW_CONFIG_VERSION); - _store.openConfigurationStore(_virtualHostName, _configurationStoreSettings); - _store.recoverConfigurationStore(mock(ConfiguredObject.class), _recoveryHandler); + _store.openConfigurationStore(_virtualHost, _configurationStoreSettings); + _store.recoverConfigurationStore(_recoveryHandler); _store.closeConfigurationStore(); - _store.openConfigurationStore(_virtualHostName, _configurationStoreSettings); - _store.recoverConfigurationStore(mock(ConfiguredObject.class), _recoveryHandler); + _store.openConfigurationStore(_virtualHost, _configurationStoreSettings); + _store.recoverConfigurationStore(_recoveryHandler); InOrder inorder = inOrder(_recoveryHandler); // first time the config version should be the initial version - 0 @@ -149,7 +148,7 @@ public class JsonFileConfigStoreTest extends QpidTestCase public void testCreateObject() throws Exception { - _store.openConfigurationStore(_virtualHostName, _configurationStoreSettings); + _store.openConfigurationStore(_virtualHost, _configurationStoreSettings); final UUID queueId = new UUID(0, 1); final String queueType = Queue.class.getSimpleName(); final Map<String,Object> queueAttr = Collections.singletonMap("name", (Object) "q1"); @@ -157,15 +156,15 @@ public class JsonFileConfigStoreTest extends QpidTestCase _store.create(new ConfiguredObjectRecordImpl(queueId, queueType, queueAttr)); _store.closeConfigurationStore(); - _store.openConfigurationStore(_virtualHostName, _configurationStoreSettings); - _store.recoverConfigurationStore(mock(ConfiguredObject.class), _recoveryHandler); + _store.openConfigurationStore(_virtualHost, _configurationStoreSettings); + _store.recoverConfigurationStore(_recoveryHandler); verify(_recoveryHandler).configuredObject(matchesRecord(queueId, queueType, queueAttr)); _store.closeConfigurationStore(); } public void testCreateAndUpdateObject() throws Exception { - _store.openConfigurationStore(_virtualHostName, _configurationStoreSettings); + _store.openConfigurationStore(_virtualHost, _configurationStoreSettings); final UUID queueId = new UUID(0, 1); final String queueType = Queue.class.getSimpleName(); Map<String,Object> queueAttr = Collections.singletonMap("name", (Object) "q1"); @@ -179,8 +178,8 @@ public class JsonFileConfigStoreTest extends QpidTestCase _store.closeConfigurationStore(); - _store.openConfigurationStore(_virtualHostName, _configurationStoreSettings); - _store.recoverConfigurationStore(mock(ConfiguredObject.class), _recoveryHandler); + _store.openConfigurationStore(_virtualHost, _configurationStoreSettings); + _store.recoverConfigurationStore(_recoveryHandler); verify(_recoveryHandler).configuredObject(matchesRecord(queueId, queueType, queueAttr)); _store.closeConfigurationStore(); } @@ -188,7 +187,7 @@ public class JsonFileConfigStoreTest extends QpidTestCase public void testCreateAndRemoveObject() throws Exception { - _store.openConfigurationStore(_virtualHostName, _configurationStoreSettings); + _store.openConfigurationStore(_virtualHost, _configurationStoreSettings); final UUID queueId = new UUID(0, 1); final String queueType = Queue.class.getSimpleName(); Map<String,Object> queueAttr = Collections.singletonMap("name", (Object) "q1"); @@ -201,15 +200,15 @@ public class JsonFileConfigStoreTest extends QpidTestCase _store.closeConfigurationStore(); - _store.openConfigurationStore(_virtualHostName, _configurationStoreSettings); - _store.recoverConfigurationStore(mock(ConfiguredObject.class), _recoveryHandler); + _store.openConfigurationStore(_virtualHost, _configurationStoreSettings); + _store.recoverConfigurationStore(_recoveryHandler); verify(_recoveryHandler, never()).configuredObject(any(ConfiguredObjectRecord.class)); _store.closeConfigurationStore(); } public void testCreateUnknownObjectType() throws Exception { - _store.openConfigurationStore(_virtualHostName, _configurationStoreSettings); + _store.openConfigurationStore(_virtualHost, _configurationStoreSettings); try { _store.create(new ConfiguredObjectRecordImpl(UUID.randomUUID(), "wibble", Collections.<String, Object>emptyMap())); @@ -223,7 +222,7 @@ public class JsonFileConfigStoreTest extends QpidTestCase public void testTwoObjectsWithSameId() throws Exception { - _store.openConfigurationStore(_virtualHostName, _configurationStoreSettings); + _store.openConfigurationStore(_virtualHost, _configurationStoreSettings); final UUID id = UUID.randomUUID(); _store.create(new ConfiguredObjectRecordImpl(id, "Queue", Collections.<String, Object>emptyMap())); try @@ -240,11 +239,11 @@ public class JsonFileConfigStoreTest extends QpidTestCase public void testChangeTypeOfObject() throws Exception { - _store.openConfigurationStore(_virtualHostName, _configurationStoreSettings); + _store.openConfigurationStore(_virtualHost, _configurationStoreSettings); final UUID id = UUID.randomUUID(); _store.create(new ConfiguredObjectRecordImpl(id, "Queue", Collections.<String, Object>emptyMap())); _store.closeConfigurationStore(); - _store.openConfigurationStore(_virtualHostName, _configurationStoreSettings); + _store.openConfigurationStore(_virtualHost, _configurationStoreSettings); try { @@ -259,13 +258,13 @@ public class JsonFileConfigStoreTest extends QpidTestCase public void testLockFileGuaranteesExclusiveAccess() throws Exception { - _store.openConfigurationStore(_virtualHostName, _configurationStoreSettings); + _store.openConfigurationStore(_virtualHost, _configurationStoreSettings); JsonFileConfigStore secondStore = new JsonFileConfigStore(); try { - secondStore.openConfigurationStore(_virtualHostName, _configurationStoreSettings); + secondStore.openConfigurationStore(_virtualHost, _configurationStoreSettings); fail("Should not be able to open a second store with the same path"); } catch(ServerScopedRuntimeException e) @@ -273,7 +272,7 @@ public class JsonFileConfigStoreTest extends QpidTestCase // pass } _store.closeConfigurationStore(); - secondStore.openConfigurationStore(_virtualHostName, _configurationStoreSettings); + secondStore.openConfigurationStore(_virtualHost, _configurationStoreSettings); } @@ -281,7 +280,7 @@ public class JsonFileConfigStoreTest extends QpidTestCase public void testCreatedNestedObjects() throws Exception { - _store.openConfigurationStore(_virtualHostName, _configurationStoreSettings); + _store.openConfigurationStore(_virtualHost, _configurationStoreSettings); final UUID queueId = new UUID(0, 1); final UUID queue2Id = new UUID(1, 1); @@ -311,8 +310,8 @@ public class JsonFileConfigStoreTest extends QpidTestCase new ConfiguredObjectRecordImpl(binding2Id, "Binding", EMPTY_ATTR, binding2Parents); _store.update(true, bindingRecord, binding2Record); _store.closeConfigurationStore(); - _store.openConfigurationStore(_virtualHostName, _configurationStoreSettings); - _store.recoverConfigurationStore(mock(ConfiguredObject.class), _recoveryHandler); + _store.openConfigurationStore(_virtualHost, _configurationStoreSettings); + _store.recoverConfigurationStore(_recoveryHandler); verify(_recoveryHandler).configuredObject(matchesRecord(queueId, "Queue", EMPTY_ATTR)); verify(_recoveryHandler).configuredObject(matchesRecord(queue2Id, "Queue", EMPTY_ATTR)); verify(_recoveryHandler).configuredObject(matchesRecord(exchangeId, "Exchange", EMPTY_ATTR)); diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java index fc69a53c85..451a2744c3 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java @@ -33,7 +33,6 @@ import java.util.UUID; import org.apache.log4j.Logger; import org.apache.qpid.server.message.EnqueueableMessage; import org.apache.qpid.server.model.ConfiguredObject; -import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler; import org.apache.qpid.test.utils.QpidTestCase; import org.apache.qpid.util.FileUtils; @@ -69,8 +68,10 @@ public abstract class MessageStoreQuotaEventsTestBase extends QpidTestCase imple MessageStoreRecoveryHandler recoveryHandler = mock(MessageStoreRecoveryHandler.class); when(recoveryHandler.begin()).thenReturn(mock(StoredMessageRecoveryHandler.class)); - _store.openMessageStore("test", storeSettings); - _store.recoverMessageStore(mock(ConfiguredObject.class), recoveryHandler, null); + ConfiguredObject<?> parent = mock(ConfiguredObject.class); + when(parent.getName()).thenReturn("test"); + _store.openMessageStore(parent, storeSettings); + _store.recoverMessageStore(recoveryHandler, null); _transactionResource = UUID.randomUUID(); _events = new ArrayList<Event>(); diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java index 1d95133784..51d3fc15d2 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java @@ -46,11 +46,15 @@ public abstract class MessageStoreTestCase extends QpidTestCase private MessageStore _store; private Map<String, Object> _storeSettings; + private ConfiguredObject<?> _parent; public void setUp() throws Exception { super.setUp(); + _parent = mock(ConfiguredObject.class); + when(_parent.getName()).thenReturn("test"); + _storedMessageRecoveryHandler = mock(StoredMessageRecoveryHandler.class); _logRecoveryHandler = mock(TransactionLogRecoveryHandler.class); _messageStoreRecoveryHandler = mock(MessageStoreRecoveryHandler.class); @@ -65,8 +69,8 @@ public abstract class MessageStoreTestCase extends QpidTestCase _store = createMessageStore(); - _store.openMessageStore("test", _storeSettings); - _store.recoverMessageStore(mock(ConfiguredObject.class), _messageStoreRecoveryHandler, _logRecoveryHandler); + _store.openMessageStore(_parent, _storeSettings); + _store.recoverMessageStore(_messageStoreRecoveryHandler, _logRecoveryHandler); } protected abstract Map<String, Object> getStoreSettings() throws Exception; @@ -106,8 +110,8 @@ public abstract class MessageStoreTestCase extends QpidTestCase _store.closeMessageStore(); _store = createMessageStore(); - _store.openMessageStore("test", _storeSettings); - _store.recoverMessageStore(mock(ConfiguredObject.class), _messageStoreRecoveryHandler, _logRecoveryHandler); + _store.openMessageStore(_parent, _storeSettings); + _store.recoverMessageStore(_messageStoreRecoveryHandler, _logRecoveryHandler); } private Record getTestRecord(long messageNumber) { diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java index c90c6af220..f6a251a691 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java @@ -184,7 +184,7 @@ public class DurableConfigurationRecovererTest extends QpidTestCase } _durableConfigurationRecoverer = new DurableConfigurationRecoverer(_vhost.getName(), recovererMap, - new DefaultUpgraderProvider(_vhost, _exchangeRegistry), new EventLogger()); + new DefaultUpgraderProvider(_vhost), new EventLogger()); _store = mock(DurableConfigurationStore.class); diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java index f20ddbc367..406a20d557 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java @@ -25,6 +25,7 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.qpid.server.message.EnqueueableMessage; import org.apache.qpid.server.message.MessageContentSource; +import org.apache.qpid.server.model.ConfiguredObject; public class QuotaMessageStore extends NullMessageStore { @@ -40,7 +41,7 @@ public class QuotaMessageStore extends NullMessageStore @Override - public void openMessageStore(String virtualHostName, Map<String, Object> messageStoreSettings) + public void openMessageStore(ConfiguredObject<?> parent, Map<String, Object> messageStoreSettings) { Object overfullAttr = messageStoreSettings.get(MessageStore.OVERFULL_SIZE); _persistentSizeHighThreshold = overfullAttr == null diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java index 8ec576c7ca..e20196c98d 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java @@ -53,20 +53,20 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore private Map<EventListener, Event[]> _eventListeners = new ConcurrentHashMap<EventListener, Event[]>(); @Override - public void openConfigurationStore(String virtualHostName, Map<String, Object> storeSettings) + public void openConfigurationStore(ConfiguredObject<?> parent, Map<String, Object> storeSettings) { if (storeSettings != null && storeSettings.get(REAL_STORE) != null) { final String realStore = (String) storeSettings.get(REAL_STORE); _realDurableConfigurationStore = new DurableConfigurationStoreCreator().createMessageStore(realStore); - _realDurableConfigurationStore.openConfigurationStore(virtualHostName, storeSettings); + _realDurableConfigurationStore.openConfigurationStore(parent, storeSettings); } } @Override - public void recoverConfigurationStore(ConfiguredObject<?> parent, ConfigurationRecoveryHandler recoveryHandler) + public void recoverConfigurationStore(ConfigurationRecoveryHandler recoveryHandler) { - _realDurableConfigurationStore.recoverConfigurationStore(parent, recoveryHandler); + _realDurableConfigurationStore.recoverConfigurationStore(recoveryHandler); } private void configureDelays(Map<String, Object> delays) @@ -134,7 +134,7 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore } @Override - public void openMessageStore(String virtualHostName, Map<String, Object> messageStoreSettings) + public void openMessageStore(ConfiguredObject<?> parent, Map<String, Object> messageStoreSettings) { Object delaysAttr = messageStoreSettings.get(DELAYS); @@ -160,7 +160,7 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore it.remove(); } } - _realMessageStore.openMessageStore(virtualHostName, messageStoreSettings); + _realMessageStore.openMessageStore(parent, messageStoreSettings); if (_realDurableConfigurationStore == null) { @@ -294,9 +294,9 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore } @Override - public void recoverMessageStore(ConfiguredObject<?> parent, MessageStoreRecoveryHandler messageRecoveryHandler, TransactionLogRecoveryHandler transactionLogRecoveryHandler) + public void recoverMessageStore(MessageStoreRecoveryHandler messageRecoveryHandler, TransactionLogRecoveryHandler transactionLogRecoveryHandler) { - _realMessageStore.recoverMessageStore(parent, messageRecoveryHandler, transactionLogRecoveryHandler); + _realMessageStore.recoverMessageStore(messageRecoveryHandler, transactionLogRecoveryHandler); } @Override |