summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Rudyy <orudyy@apache.org>2014-03-31 15:37:34 +0000
committerAlex Rudyy <orudyy@apache.org>2014-03-31 15:37:34 +0000
commit6de63bce6e61011a921337d9c3880e199f21c94c (patch)
tree539f0c2471e185edb5eb2ed5a14d5c69b0ef42db
parent4e8e65d2a2569aee45f20a09f4f34c6abcf81f59 (diff)
downloadqpid-python-6de63bce6e61011a921337d9c3880e199f21c94c.tar.gz
QPID-5653: Open databases and upgrade on opening of configuration/message stores
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-bdb-ha2@1583351 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfiguredObjectRecord.java7
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java18
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java127
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeFactory.java6
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java10
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java13
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java33
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java4
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java2
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java9
-rw-r--r--qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java4
-rw-r--r--qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java42
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationRecoverer.java1
-rwxr-xr-xqpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java11
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java6
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java7
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java10
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java19
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java25
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java6
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreTest.java63
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java7
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java12
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java2
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java3
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java16
27 files changed, 290 insertions, 175 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfiguredObjectRecord.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfiguredObjectRecord.java
index f13e4dd08b..a5eac25968 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfiguredObjectRecord.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfiguredObjectRecord.java
@@ -104,4 +104,11 @@ public class BDBConfiguredObjectRecord implements ConfiguredObjectRecord
result = 31 * result + (_type != null ? _type.hashCode() : 0);
return result;
}
+
+ @Override
+ public String toString()
+ {
+ return "BDBConfiguredObjectRecord [id=" + _id + ", type=" + _type + ", name=" + (_attributes == null ? null : _attributes.get("name")) + ", parents=" + _parents + "]";
+ }
+
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java
index 4fe41d512a..aae0a56a40 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java
@@ -1,4 +1,3 @@
-package org.apache.qpid.server.store.berkeleydb;
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -19,7 +18,9 @@ package org.apache.qpid.server.store.berkeleydb;
* under the License.
*
*/
+package org.apache.qpid.server.store.berkeleydb;
+import java.util.HashMap;
import java.util.Map;
import org.apache.log4j.Logger;
@@ -65,10 +66,11 @@ public class BDBHAVirtualHost extends AbstractVirtualHost
_messageStore = new BDBMessageStore(new ReplicatedEnvironmentFacadeFactory());
getEventLogger().message(_messageStoreLogSubject, MessageStoreMessages.CREATED());
- Map<String, Object> messageStoreSettings = virtualHost.getMessageStoreSettings();
+ Map<String, Object> messageStoreSettings = new HashMap<String, Object>(virtualHost.getMessageStoreSettings());
+ messageStoreSettings.put(DurableConfigurationStore.IS_MESSAGE_STORE_TOO, true);
- _messageStore.openConfigurationStore(virtualHost.getName(), messageStoreSettings);
- _messageStore.openMessageStore(virtualHost.getName(), messageStoreSettings);
+ _messageStore.openConfigurationStore(virtualHost, messageStoreSettings);
+ _messageStore.openMessageStore(virtualHost, messageStoreSettings);
getEventLogger().message(_messageStoreLogSubject, MessageStoreMessages.STORE_LOCATION(_messageStore.getStoreLocation()));
@@ -96,15 +98,17 @@ public class BDBHAVirtualHost extends AbstractVirtualHost
{
_messageStore.getEnvironmentFacade().getEnvironment().flushLog(true);
+ DefaultUpgraderProvider upgraderProvider = new DefaultUpgraderProvider(this);
+
DurableConfigurationRecoverer configRecoverer =
new DurableConfigurationRecoverer(getName(), getDurableConfigurationRecoverers(),
- new DefaultUpgraderProvider(BDBHAVirtualHost.this, getExchangeRegistry()), getEventLogger());
- _messageStore.recoverConfigurationStore(getModel(), configRecoverer);
+ upgraderProvider, getEventLogger());
+ _messageStore.recoverConfigurationStore(configRecoverer);
initialiseModel();
VirtualHostConfigRecoveryHandler recoveryHandler = new VirtualHostConfigRecoveryHandler(BDBHAVirtualHost.this, getMessageStoreLogSubject());
- _messageStore.recoverMessageStore(getModel(), recoveryHandler, recoveryHandler);
+ _messageStore.recoverMessageStore(recoveryHandler, recoveryHandler);
attainActivation();
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
index 9f8e5410a7..8aac9a6247 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
@@ -56,6 +56,7 @@ import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.TransactionLogRecoveryHandler;
import org.apache.qpid.server.store.TransactionLogRecoveryHandler.QueueEntryRecoveryHandler;
import org.apache.qpid.server.store.TransactionLogResource;
+import org.apache.qpid.server.store.berkeleydb.EnvironmentFacadeFactory.EnvironmentFacadeTask;
import org.apache.qpid.server.store.berkeleydb.entry.HierarchyKey;
import org.apache.qpid.server.store.berkeleydb.entry.PreparedTransaction;
import org.apache.qpid.server.store.berkeleydb.entry.QueueEntryKey;
@@ -69,6 +70,7 @@ import org.apache.qpid.server.store.berkeleydb.tuple.QueueEntryBinding;
import org.apache.qpid.server.store.berkeleydb.tuple.UUIDTupleBinding;
import org.apache.qpid.server.store.berkeleydb.tuple.XidBinding;
import org.apache.qpid.server.store.berkeleydb.upgrade.Upgrader;
+import org.apache.qpid.server.util.MapValueConverter;
import org.apache.qpid.util.FileUtils;
import com.sleepycat.bind.tuple.ByteBinding;
@@ -152,39 +154,43 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
}
@Override
- public void openConfigurationStore(String virtualHostName, Map<String, Object> storeSettings)
+ public void openConfigurationStore(ConfiguredObject<?> parent, Map<String, Object> storeSettings)
{
if (_configurationStoreOpen.compareAndSet(false, true))
{
if (_environmentFacade == null)
{
- _environmentFacade = _environmentFacadeFactory.createEnvironmentFacade(virtualHostName, storeSettings);
+ String[] databaseNames = null;
+ if (MapValueConverter.getBooleanAttribute(IS_MESSAGE_STORE_TOO, storeSettings, false))
+ {
+ databaseNames = new String[CONFIGURATION_STORE_DATABASE_NAMES.length + MESSAGE_STORE_DATABASE_NAMES.length];
+ System.arraycopy(CONFIGURATION_STORE_DATABASE_NAMES, 0, databaseNames, 0, CONFIGURATION_STORE_DATABASE_NAMES.length);
+ System.arraycopy(MESSAGE_STORE_DATABASE_NAMES, 0, databaseNames, CONFIGURATION_STORE_DATABASE_NAMES.length, MESSAGE_STORE_DATABASE_NAMES.length);
+ }
+ else
+ {
+ databaseNames = CONFIGURATION_STORE_DATABASE_NAMES;
+ }
+ _environmentFacade = _environmentFacadeFactory.createEnvironmentFacade(storeSettings, new UpgradeTask(parent), new OpenDatabasesTask(databaseNames));
+ }
+ else
+ {
+ throw new IllegalStateException("The database have been already opened as message store");
}
}
}
@Override
- public void recoverConfigurationStore(ConfiguredObject<?> parent, ConfigurationRecoveryHandler recoveryHandler)
+ public void recoverConfigurationStore(ConfigurationRecoveryHandler recoveryHandler)
{
checkConfigurationStoreOpen();
- DatabaseConfig dbConfig = new DatabaseConfig();
- dbConfig.setTransactional(true);
- dbConfig.setAllowCreate(true);
- try
- {
- new Upgrader(_environmentFacade.getEnvironment(), parent).upgradeIfNecessary();
- _environmentFacade.openDatabases(dbConfig, CONFIGURATION_STORE_DATABASE_NAMES);
- }
- catch(DatabaseException e)
- {
- throw _environmentFacade.handleDatabaseException("Cannot configure store", e);
- }
+
recoverConfig(recoveryHandler);
}
@Override
- public void openMessageStore(String virtualHostName, Map<String, Object> messageStoreSettings) throws StoreException
+ public void openMessageStore(ConfiguredObject<?> parent, Map<String, Object> messageStoreSettings) throws StoreException
{
if (_messageStoreOpen.compareAndSet(false, true))
{
@@ -204,34 +210,19 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
if (_environmentFacade == null)
{
- _environmentFacade = _environmentFacadeFactory.createEnvironmentFacade(virtualHostName, messageStoreSettings);
+ _environmentFacade = _environmentFacadeFactory.createEnvironmentFacade(messageStoreSettings, new UpgradeTask(parent), new OpenDatabasesTask(MESSAGE_STORE_DATABASE_NAMES), new DiskSpaceTask());
}
- _committer = _environmentFacade.createCommitter(virtualHostName);
+ _committer = _environmentFacade.createCommitter(parent.getName());
_committer.start();
-
}
}
@Override
- public synchronized void recoverMessageStore(ConfiguredObject<?> parent, MessageStoreRecoveryHandler messageRecoveryHandler, TransactionLogRecoveryHandler transactionLogRecoveryHandler) throws StoreException
+ public synchronized void recoverMessageStore(MessageStoreRecoveryHandler messageRecoveryHandler, TransactionLogRecoveryHandler transactionLogRecoveryHandler) throws StoreException
{
checkMessageStoreOpen();
- DatabaseConfig dbConfig = new DatabaseConfig();
- dbConfig.setTransactional(true);
- dbConfig.setAllowCreate(true);
- try
- {
- new Upgrader(_environmentFacade.getEnvironment(), parent).upgradeIfNecessary();
- _environmentFacade.openDatabases(dbConfig, MESSAGE_STORE_DATABASE_NAMES);
- _totalStoreSize = getSizeOnDisk();
- }
- catch(DatabaseException e)
- {
- throw _environmentFacade.handleDatabaseException("Cannot upgrade message store or open datatbases", e);
- }
-
if(messageRecoveryHandler != null)
{
recoverMessages(messageRecoveryHandler);
@@ -1843,4 +1834,72 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
return _environmentFacade.getOpenDatabase(XID_DB_NAME);
}
+ class UpgradeTask implements EnvironmentFacadeTask
+ {
+
+ private ConfiguredObject<?> _parent;
+
+ public UpgradeTask(ConfiguredObject<?> parent)
+ {
+ _parent = parent;
+ }
+
+ @Override
+ public void execute(EnvironmentFacade facade)
+ {
+ try
+ {
+ new Upgrader(facade.getEnvironment(), _parent).upgradeIfNecessary();
+ }
+ catch(DatabaseException e)
+ {
+ throw facade.handleDatabaseException("Cannot upgrade store", e);
+ }
+ }
+ }
+
+ class OpenDatabasesTask implements EnvironmentFacadeTask
+ {
+ private String[] _names;
+
+ public OpenDatabasesTask(String[] names)
+ {
+ _names = names;
+ }
+
+ @Override
+ public void execute(EnvironmentFacade facade)
+ {
+ try
+ {
+ DatabaseConfig dbConfig = new DatabaseConfig();
+ dbConfig.setTransactional(true);
+ dbConfig.setAllowCreate(true);
+ facade.openDatabases(dbConfig, _names);
+ }
+ catch(DatabaseException e)
+ {
+ throw facade.handleDatabaseException("Cannot open databases", e);
+ }
+ }
+
+ }
+
+ class DiskSpaceTask implements EnvironmentFacadeTask
+ {
+
+ @Override
+ public void execute(EnvironmentFacade facade)
+ {
+ try
+ {
+ _totalStoreSize = facade.getEnvironment().getStats(null).getTotalLogSize();
+ }
+ catch(DatabaseException e)
+ {
+ throw facade.handleDatabaseException("Cannot evaluate disk store size", e);
+ }
+ }
+
+ }
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeFactory.java
index fd064d9b0e..2e02a6cfed 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeFactory.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeFactory.java
@@ -26,8 +26,12 @@ public interface EnvironmentFacadeFactory
{
public static final String ENVIRONMENT_CONFIGURATION = "bdbEnvironmentConfig";
- EnvironmentFacade createEnvironmentFacade(String virtualHostName, Map<String, Object> storeSettings);
+ EnvironmentFacade createEnvironmentFacade(Map<String, Object> storeSettings, EnvironmentFacadeTask... initialisationTasks);
String getType();
+ public static interface EnvironmentFacadeTask
+ {
+ void execute(EnvironmentFacade facade);
+ }
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
index 8117ca1a9a..6065be5fa9 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
@@ -25,6 +25,7 @@ import java.util.HashMap;
import java.util.Map;
import org.apache.log4j.Logger;
+import org.apache.qpid.server.store.berkeleydb.EnvironmentFacadeFactory.EnvironmentFacadeTask;
import com.sleepycat.je.Database;
import com.sleepycat.je.DatabaseConfig;
@@ -42,7 +43,7 @@ public class StandardEnvironmentFacade implements EnvironmentFacade
private Environment _environment;
- public StandardEnvironmentFacade(String storePath, Map<String, String> attributes)
+ public StandardEnvironmentFacade(String storePath, Map<String, String> attributes, EnvironmentFacadeTask[] initialisationTasks)
{
_storePath = storePath;
@@ -74,6 +75,13 @@ public class StandardEnvironmentFacade implements EnvironmentFacade
envConfig.setExceptionListener(new LoggingAsyncExceptionListener());
_environment = new Environment(environmentPath, envConfig);
+ if (initialisationTasks != null)
+ {
+ for (EnvironmentFacadeTask task : initialisationTasks)
+ {
+ task.execute(this);
+ }
+ }
}
@Override
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java
index cc38b799a6..9506b1c20a 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java
@@ -20,11 +20,9 @@
*/
package org.apache.qpid.server.store.berkeleydb;
-import java.io.File;
import java.util.HashMap;
import java.util.Map;
-import org.apache.qpid.server.configuration.BrokerProperties;
import org.apache.qpid.server.store.MessageStore;
public class StandardEnvironmentFacadeFactory implements EnvironmentFacadeFactory
@@ -32,25 +30,18 @@ public class StandardEnvironmentFacadeFactory implements EnvironmentFacadeFactor
@SuppressWarnings("unchecked")
@Override
- public EnvironmentFacade createEnvironmentFacade(String virtualHostName, Map<String, Object> messageStoreSettings)
+ public EnvironmentFacade createEnvironmentFacade(Map<String, Object> messageStoreSettings, EnvironmentFacadeTask... initialisationTasks)
{
Map<String, String> envConfigMap = new HashMap<String, String>();
envConfigMap.putAll(EnvironmentFacade.ENVCONFIG_DEFAULTS);
- final String defaultPath = System.getProperty(BrokerProperties.PROPERTY_QPID_WORK) + File.separator + "bdbstore" + File.separator + virtualHostName;
-
Object environmentConfigurationAttributes = messageStoreSettings.get(ENVIRONMENT_CONFIGURATION);
if (environmentConfigurationAttributes instanceof Map)
{
envConfigMap.putAll((Map<String, String>) environmentConfigurationAttributes);
}
String storeLocation = (String) messageStoreSettings.get(MessageStore.STORE_PATH);
- if(storeLocation == null)
- {
- storeLocation = defaultPath;
- }
-
- return new StandardEnvironmentFacade(storeLocation, envConfigMap);
+ return new StandardEnvironmentFacade(storeLocation, envConfigMap, initialisationTasks);
}
@Override
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
index 3e15e9bdcc..b8192ea741 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
@@ -40,12 +40,14 @@ import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.log4j.Logger;
import org.apache.qpid.server.store.berkeleydb.CoalescingCommiter;
import org.apache.qpid.server.store.berkeleydb.Committer;
import org.apache.qpid.server.store.berkeleydb.EnvironmentFacade;
+import org.apache.qpid.server.store.berkeleydb.EnvironmentFacadeFactory.EnvironmentFacadeTask;
import org.apache.qpid.server.store.berkeleydb.LoggingAsyncExceptionListener;
import org.apache.qpid.server.util.DaemonThreadFactory;
@@ -110,7 +112,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
*/
put(ReplicationConfig.ENV_SETUP_TIMEOUT, "15 min");
/**
- * Parameter changed from default (off) to allow the Environment to start in the
+ * Parameter changed from default (off) to allow the Environment to start in the
* UNKNOWN state when the majority is not available.
*/
put(ReplicationConfig.ENV_UNKNOWN_STATE_TIMEOUT, "5 s");
@@ -148,7 +150,10 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
private volatile long _joinTime;
private volatile ReplicatedEnvironment.State _lastKnownEnvironmentState;
- public ReplicatedEnvironmentFacade(ReplicatedEnvironmentConfiguration configuration)
+ private AtomicBoolean _initialised;
+ private EnvironmentFacadeTask[] _initialisationTasks;
+
+ public ReplicatedEnvironmentFacade(ReplicatedEnvironmentConfiguration configuration, EnvironmentFacadeTask[] initialisationTasks)
{
_environmentDirectory = new File(configuration.getStorePath());
if (!_environmentDirectory.exists())
@@ -160,6 +165,8 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
}
}
+ _initialised = new AtomicBoolean();
+ _initialisationTasks = initialisationTasks;
_configuration = configuration;
_durability = Durability.parse(_configuration.getDurability());
@@ -393,9 +400,10 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
LOGGER.info("The environment facade is in open state for node " + _prettyGroupNodeName);
_joinTime = System.currentTimeMillis();
}
+
if (state == ReplicatedEnvironment.State.MASTER)
{
- reopenDatabases();
+ onMasterStateChange();
}
}
@@ -413,6 +421,22 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
_lastKnownEnvironmentState = state;
}
+ private void onMasterStateChange()
+ {
+ reopenDatabases();
+
+ if (_initialised.compareAndSet(false, true))
+ {
+ if (_initialisationTasks != null)
+ {
+ for (EnvironmentFacadeTask task : _initialisationTasks)
+ {
+ task.execute(ReplicatedEnvironmentFacade.this);
+ }
+ }
+ }
+ }
+
private void reopenDatabases()
{
if (_state.get() == State.OPEN)
@@ -992,7 +1016,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
{
nodeState = ReplicatedEnvironment.State.UNKNOWN;
}
-
+
currentGroupState.put(node.getName(), nodeState);
return null;
}
@@ -1079,5 +1103,4 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
}
}
-
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java
index c6b3e48cf8..8216cfc484 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java
@@ -47,7 +47,7 @@ public class ReplicatedEnvironmentFacadeFactory implements EnvironmentFacadeFact
private static final boolean DEFAULT_COALESCING_SYNC = true;
@Override
- public EnvironmentFacade createEnvironmentFacade(String virtualHostName, final Map<String, Object> messageStoreSettings)
+ public EnvironmentFacade createEnvironmentFacade(final Map<String, Object> messageStoreSettings, EnvironmentFacadeTask... initialisationTasks)
{
ReplicatedEnvironmentConfiguration configuration = new ReplicatedEnvironmentConfiguration()
{
@@ -126,7 +126,7 @@ public class ReplicatedEnvironmentFacadeFactory implements EnvironmentFacadeFact
return durability == null ? DEFAULT_DURABILITY.toString() : durability;
}
};
- return new ReplicatedEnvironmentFacade(configuration);
+ return new ReplicatedEnvironmentFacade(configuration, initialisationTasks);
}
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java
index b19e18b204..a82bb066e2 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java
@@ -122,7 +122,7 @@ public class StandardEnvironmentFacadeTest extends QpidTestCase
EnvironmentFacade createEnvironmentFacade()
{
- return new StandardEnvironmentFacade(_storePath.getAbsolutePath(), Collections.<String, String>emptyMap());
+ return new StandardEnvironmentFacade(_storePath.getAbsolutePath(), Collections.<String, String>emptyMap(), null);
}
}
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
index cd7dd69c46..b342493c59 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
@@ -31,8 +31,6 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.qpid.server.configuration.updater.TaskExecutor;
-import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.store.berkeleydb.EnvironmentFacade;
import org.apache.qpid.test.utils.QpidTestCase;
import org.apache.qpid.test.utils.TestFileUtils;
@@ -65,16 +63,11 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase
private File _storePath;
private final Map<String, ReplicatedEnvironmentFacade> _nodes = new HashMap<String, ReplicatedEnvironmentFacade>();
- private VirtualHost _virtualHost = mock(VirtualHost.class);
public void setUp() throws Exception
{
super.setUp();
- TaskExecutor taskExecutor = mock(TaskExecutor.class);
- when(taskExecutor.isTaskExecutorThread()).thenReturn(true);
- when(_virtualHost.getTaskExecutor()).thenReturn(taskExecutor);
-
_storePath = TestFileUtils.createTestDirectory("bdb", true);
setTestSystemProperty(ReplicatedEnvironmentFacade.DB_PING_SOCKET_TIMEOUT_PROPERTY_NAME, "100");
@@ -302,7 +295,7 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase
State desiredState, StateChangeListener stateChangeListener)
{
ReplicatedEnvironmentConfiguration config = createReplicatedEnvironmentConfiguration(nodeName, nodeHostPort, designatedPrimary);
- ReplicatedEnvironmentFacade ref = new ReplicatedEnvironmentFacade(config);
+ ReplicatedEnvironmentFacade ref = new ReplicatedEnvironmentFacade(config, null);
ref.setStateChangeListener(stateChangeListener);
_nodes.put(nodeName, ref);
return ref;
diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
index ba84d0682a..465c49e0c4 100644
--- a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
+++ b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
@@ -242,9 +242,9 @@ public class BDBMessageStoreTest extends MessageStoreTest
MessageStoreRecoveryHandler recoveryHandler = mock(MessageStoreRecoveryHandler.class);
when(recoveryHandler.begin()).thenReturn(mock(StoredMessageRecoveryHandler.class));
VirtualHost<?> virtualHost = getVirtualHostModel();
- newStore.openMessageStore(virtualHost.getName(), virtualHost.getMessageStoreSettings());
+ newStore.openMessageStore(virtualHost, virtualHost.getMessageStoreSettings());
- newStore.recoverMessageStore(getVirtualHostModel(), recoveryHandler, null);
+ newStore.recoverMessageStore(recoveryHandler, null);
return newStore;
}
diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java
index 2e7bccf8f3..921eb916ea 100644
--- a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java
+++ b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java
@@ -362,7 +362,7 @@ public class BDBUpgradeTest extends QpidBrokerTestCase
/**
*
- * TODO Raise Jira and resolve so this test can be reenabled.
+ * TODO (QPID-5650) Resolve so this test can be reenabled.
*
* Test that the queue configured to have a DLQ was recovered and has the alternate exchange
* and max delivery count, the DLE exists, the DLQ exists with no max delivery count, the
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
index 28ac79075e..e7b6adaf7a 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
@@ -183,11 +183,20 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
@Override
- public void openConfigurationStore(String virtualHostName, Map<String, Object> storeSettings)
+ public void openConfigurationStore(ConfiguredObject<?> parent, Map<String, Object> storeSettings)
{
if (_configurationStoreOpen.compareAndSet(false, true))
{
- initialiseIfNecessary(virtualHostName, storeSettings);
+ initialiseIfNecessary(parent.getName(), storeSettings);
+ try
+ {
+ createOrOpenConfigurationStoreDatabase();
+ upgradeIfVersionTableExists(parent);
+ }
+ catch(SQLException e)
+ {
+ throw new StoreException("Cannot create databases or upgrade", e);
+ }
}
}
@@ -212,14 +221,12 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
}
@Override
- public void recoverConfigurationStore(ConfiguredObject<?> parent, ConfigurationRecoveryHandler recoveryHandler)
+ public void recoverConfigurationStore(ConfigurationRecoveryHandler recoveryHandler)
{
checkConfigurationStoreOpen();
try
{
- createOrOpenConfigurationStoreDatabase();
- upgradeIfVersionTableExists(parent);
recoveryHandler.beginConfigurationRecovery(this, getConfigVersion());
loadConfiguredObjects(recoveryHandler);
setConfigVersion(recoveryHandler.completeConfigurationRecovery());
@@ -266,27 +273,28 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
}
@Override
- public void openMessageStore(String virtualHostName, Map<String, Object> messageStoreSettings)
+ public void openMessageStore(ConfiguredObject<?> parent, Map<String, Object> messageStoreSettings)
{
if (_messageStoreOpen.compareAndSet(false, true))
{
- initialiseIfNecessary(virtualHostName, messageStoreSettings);
+ initialiseIfNecessary(parent.getName(), messageStoreSettings);
+ try
+ {
+ createOrOpenMessageStoreDatabase();
+ upgradeIfNecessary(parent);
+ }
+ catch (SQLException e)
+ {
+ throw new StoreException("Unable to activate message store ", e);
+ }
}
}
@Override
- public void recoverMessageStore(ConfiguredObject<?> parent, MessageStoreRecoveryHandler messageRecoveryHandler, TransactionLogRecoveryHandler transactionLogRecoveryHandler)
+ public void recoverMessageStore(MessageStoreRecoveryHandler messageRecoveryHandler, TransactionLogRecoveryHandler transactionLogRecoveryHandler)
{
checkMessageStoreOpen();
- try
- {
- createOrOpenMessageStoreDatabase();
- upgradeIfNecessary(parent);
- }
- catch (SQLException e)
- {
- throw new StoreException("Unable to activate message store ", e);
- }
+
if(messageRecoveryHandler != null)
{
try
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationRecoverer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationRecoverer.java
index 84f24df1cc..5975bf58b3 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationRecoverer.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationRecoverer.java
@@ -32,7 +32,6 @@ import org.apache.log4j.Logger;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.logging.messages.ConfigStoreMessages;
-import org.apache.qpid.server.logging.messages.MessageStoreMessages;
import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
import static org.apache.qpid.server.model.VirtualHost.CURRENT_CONFIG_VERSION;
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java
index 918a9b0134..624c451df6 100755
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java
@@ -29,6 +29,8 @@ public interface DurableConfigurationStore
{
String STORE_TYPE = "storeType";
String STORE_PATH = "storePath";
+ String IS_MESSAGE_STORE_TOO = "isMessageStoreToo";
+
public static interface Source
{
@@ -38,17 +40,16 @@ public interface DurableConfigurationStore
/**
* Called after instantiation in order to configure the message store. A particular implementation can define
* whatever parameters it wants.
- * @param virtualHostName host name
+ * @param parent host name
* @param storeSettings store settings
*/
- void openConfigurationStore(String virtualHostName, Map<String, Object> storeSettings) throws StoreException;
+ void openConfigurationStore(ConfiguredObject<?> parent, Map<String, Object> storeSettings) throws StoreException;
/**
* Recovers configuration from the store using given recovery handler
- * @param parent parent
* @param recoveryHandler recovery handler
*/
- void recoverConfigurationStore(ConfiguredObject<?> parent, ConfigurationRecoveryHandler recoveryHandler) throws StoreException;
+ void recoverConfigurationStore(ConfigurationRecoveryHandler recoveryHandler) throws StoreException;
/**
* Makes the specified object persistent.
@@ -81,6 +82,6 @@ public interface DurableConfigurationStore
*/
void update(boolean createIfNecessary, ConfiguredObjectRecord... records) throws StoreException;
-
void closeConfigurationStore() throws StoreException;
+
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java
index 97559c8286..0eca9adda0 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java
@@ -89,16 +89,16 @@ public class JsonFileConfigStore implements DurableConfigurationStore
}
@Override
- public void openConfigurationStore(String virtualHostName, Map<String, Object> storeSettings)
+ public void openConfigurationStore(ConfiguredObject<?> parent, Map<String, Object> storeSettings)
{
- _name = virtualHostName;
+ _name = parent.getName();
setup(storeSettings);
load();
}
@Override
- public void recoverConfigurationStore(ConfiguredObject<?> parent, ConfigurationRecoveryHandler recoveryHandler)
+ public void recoverConfigurationStore(ConfigurationRecoveryHandler recoveryHandler)
{
recoveryHandler.beginConfigurationRecovery(this,_configVersion);
List<ConfiguredObjectRecord> records = new ArrayList<ConfiguredObjectRecord>(_objectsById.values());
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java
index b6b65087a4..69f9073f6e 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java
@@ -38,18 +38,17 @@ public interface MessageStore
/**
* Called after instantiation in order to open and initialize the message store. A particular implementation can define
* whatever parameters it wants.
- * @param virtualHostName virtual host name
+ * @param parent virtual host name
* @param messageStoreSettings store settings
*/
- void openMessageStore(String virtualHostName, Map<String, Object> messageStoreSettings);
+ void openMessageStore(ConfiguredObject<?> parent, Map<String, Object> messageStoreSettings);
/**
* Called after opening to recover messages and transactions with given recovery handlers
- * @param parent TODO
* @param messageRecoveryHandler
* @param transactionLogRecoveryHandler
*/
- void recoverMessageStore(ConfiguredObject<?> parent, MessageStoreRecoveryHandler messageRecoveryHandler, TransactionLogRecoveryHandler transactionLogRecoveryHandler);
+ void recoverMessageStore(MessageStoreRecoveryHandler messageRecoveryHandler, TransactionLogRecoveryHandler transactionLogRecoveryHandler);
public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(T metaData);
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java
index 66c3fe6cae..59b4530014 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java
@@ -26,13 +26,14 @@ import org.apache.qpid.server.model.ConfiguredObject;
public abstract class NullMessageStore implements MessageStore, DurableConfigurationStore
{
+
@Override
- public void openConfigurationStore(String virtualHostName, Map<String, Object> storeSettings)
+ public void openConfigurationStore(ConfiguredObject<?> parent, Map<String, Object> storeSettings)
{
}
@Override
- public void recoverConfigurationStore(ConfiguredObject<?> parent, ConfigurationRecoveryHandler recoveryHandler)
+ public void recoverConfigurationStore(ConfigurationRecoveryHandler recoveryHandler)
{
}
@@ -58,7 +59,7 @@ public abstract class NullMessageStore implements MessageStore, DurableConfigura
}
@Override
- public void openMessageStore(String virtualHostName, Map<String, Object> messageStoreSettings)
+ public void openMessageStore(ConfiguredObject<?> parent, Map<String, Object> messageStoreSettings)
{
}
@@ -91,7 +92,7 @@ public abstract class NullMessageStore implements MessageStore, DurableConfigura
}
@Override
- public void recoverMessageStore(ConfiguredObject<?> parent, MessageStoreRecoveryHandler messageRecoveryHandler, TransactionLogRecoveryHandler transactionLogRecoveryHandler)
+ public void recoverMessageStore(MessageStoreRecoveryHandler messageRecoveryHandler, TransactionLogRecoveryHandler transactionLogRecoveryHandler)
{
}
@@ -110,4 +111,5 @@ public abstract class NullMessageStore implements MessageStore, DurableConfigura
public void onDelete()
{
}
+
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java
index f6fa6344d3..7e0562afec 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java
@@ -30,7 +30,6 @@ import java.util.Map.Entry;
import java.util.UUID;
import org.apache.log4j.Logger;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.exchange.TopicExchange;
import org.apache.qpid.server.filter.FilterSupport;
import org.apache.qpid.server.model.Binding;
@@ -52,7 +51,6 @@ public class DefaultUpgraderProvider implements UpgraderProvider
public static final String EXCLUSIVE = "exclusive";
public static final String NAME = "name";
- private final ExchangeRegistry _exchangeRegistry;
private final VirtualHost _virtualHost;
@SuppressWarnings("serial")
@@ -66,11 +64,9 @@ public class DefaultUpgraderProvider implements UpgraderProvider
private final Map<String, UUID> _defaultExchangeIds;
- public DefaultUpgraderProvider(final VirtualHost virtualHost,
- final ExchangeRegistry exchangeRegistry)
+ public DefaultUpgraderProvider(final VirtualHost virtualHost)
{
_virtualHost = virtualHost;
- _exchangeRegistry = exchangeRegistry;
Map<String, UUID> defaultExchangeIds = new HashMap<String, UUID>();
for (String exchangeName : DEFAULT_EXCHANGES.keySet())
{
@@ -150,7 +146,12 @@ public class DefaultUpgraderProvider implements UpgraderProvider
private boolean isTopicExchange(ConfiguredObjectRecord entry)
{
- UUID exchangeId = entry.getParents().get("Exchange").getId();
+ ConfiguredObjectRecord exchangeRecord = entry.getParents().get("Exchange");
+ if (exchangeRecord == null)
+ {
+ return false;
+ }
+ UUID exchangeId = exchangeRecord.getId();
if(_records.containsKey(exchangeId))
{
@@ -165,8 +166,8 @@ public class DefaultUpgraderProvider implements UpgraderProvider
return true;
}
- return _exchangeRegistry.getExchange(exchangeId) != null
- && _exchangeRegistry.getExchange(exchangeId).getExchangeType() == TopicExchange.TYPE;
+ return _virtualHost.getExchange(exchangeId) != null
+ && _virtualHost.getExchange(exchangeId).getExchangeType() == TopicExchange.TYPE;
}
}
@@ -253,7 +254,7 @@ public class DefaultUpgraderProvider implements UpgraderProvider
}
ConfiguredObjectRecord localRecord = getUpdateMap().get(exchangeId);
return !((localRecord != null && localRecord.getType().equals(Exchange.class.getSimpleName()))
- || _exchangeRegistry.getExchange(exchangeId) != null);
+ || _virtualHost.getExchange(exchangeId) != null);
}
private boolean unknownQueue(final UUID queueId)
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java
index 6b75c39c49..e3fd938225 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java
@@ -1,4 +1,4 @@
-package org.apache.qpid.server.virtualhost;/*
+/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -18,7 +18,9 @@ package org.apache.qpid.server.virtualhost;/*
* under the License.
*
*/
+package org.apache.qpid.server.virtualhost;
+import java.util.HashMap;
import java.util.Map;
import org.apache.qpid.server.logging.messages.ConfigStoreMessages;
@@ -27,6 +29,7 @@ import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.plugin.MessageStoreFactory;
import org.apache.qpid.server.stats.StatisticsGatherer;
+
import org.apache.qpid.server.store.DurableConfigurationRecoverer;
import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.DurableConfigurationStoreCreator;
@@ -83,18 +86,21 @@ public class StandardVirtualHost extends AbstractVirtualHost
String configurationStoreType = configurationStoreSettings == null ? null : (String) configurationStoreSettings.get(DurableConfigurationStore.STORE_TYPE);
_durableConfigurationStore = initialiseConfigurationStore(configurationStoreType);
boolean combinedStores = _durableConfigurationStore == _messageStore;
+ if (combinedStores)
+ {
+ configurationStoreSettings = new HashMap<String,Object>(messageStoreSettings);
+ configurationStoreSettings.put(DurableConfigurationStore.IS_MESSAGE_STORE_TOO, true);
+ }
+
if (!combinedStores)
{
_configurationStoreLogSubject = new MessageStoreLogSubject(getName(), _durableConfigurationStore.getClass().getSimpleName());
getEventLogger().message(_configurationStoreLogSubject, ConfigStoreMessages.CREATED());
}
- DurableConfigurationRecoverer configRecoverer =
- new DurableConfigurationRecoverer(getName(), getDurableConfigurationRecoverers(),
- new DefaultUpgraderProvider(this, getExchangeRegistry()), getEventLogger());
- _durableConfigurationStore.openConfigurationStore(virtualHost.getName(), combinedStores ? messageStoreSettings: configurationStoreSettings);
+ _durableConfigurationStore.openConfigurationStore(virtualHost, configurationStoreSettings);
- _messageStore.openMessageStore(virtualHost.getName(), virtualHost.getMessageStoreSettings());
+ _messageStore.openMessageStore(virtualHost, virtualHost.getMessageStoreSettings());
getEventLogger().message(_messageStoreLogSubject, MessageStoreMessages.STORE_LOCATION(_messageStore.getStoreLocation()));
@@ -103,13 +109,16 @@ public class StandardVirtualHost extends AbstractVirtualHost
getEventLogger().message(_configurationStoreLogSubject, ConfigStoreMessages.STORE_LOCATION(configurationStoreSettings.toString()));
}
- _durableConfigurationStore.recoverConfigurationStore(getModel(), configRecoverer);
+ DurableConfigurationRecoverer configRecoverer = new DurableConfigurationRecoverer(getName(), getDurableConfigurationRecoverers(),
+ new DefaultUpgraderProvider(this), getEventLogger());
+
+ _durableConfigurationStore.recoverConfigurationStore(configRecoverer);
// If store does not have entries for standard exchanges (amq.*), the following will create them.
initialiseModel();
VirtualHostConfigRecoveryHandler recoveryHandler = new VirtualHostConfigRecoveryHandler(this, getMessageStoreLogSubject());
- _messageStore.recoverMessageStore(getModel(), recoveryHandler, recoveryHandler);
+ _messageStore.recoverMessageStore(recoveryHandler, recoveryHandler);
attainActivation();
}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java
index fbd26208d8..83052110a1 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java
@@ -488,8 +488,10 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
closeConfigStore();
_configStore = createConfigStore();
- _configStore.openConfigurationStore("testName", _configurationStoreSettings);
- _configStore.recoverConfigurationStore(mock(ConfiguredObject.class), _recoveryHandler);
+ ConfiguredObject<?> parent = mock(ConfiguredObject.class);
+ when(parent.getName()).thenReturn("testName");
+ _configStore.openConfigurationStore(parent, _configurationStoreSettings);
+ _configStore.recoverConfigurationStore(_recoveryHandler);
}
protected abstract DurableConfigurationStore createConfigStore() throws Exception;
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreTest.java
index bca16b6e70..1de24e371d 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreTest.java
@@ -26,7 +26,6 @@ import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
-import org.apache.qpid.server.model.Binding;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
@@ -37,8 +36,6 @@ import org.mockito.ArgumentMatcher;
import org.mockito.InOrder;
import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyMap;
-import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.argThat;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.inOrder;
@@ -53,7 +50,7 @@ public class JsonFileConfigStoreTest extends QpidTestCase
private JsonFileConfigStore _store;
private HashMap<String, Object> _configurationStoreSettings;
- private String _virtualHostName;
+ private ConfiguredObject<?> _virtualHost;
private File _storeLocation;
@@ -64,7 +61,9 @@ public class JsonFileConfigStoreTest extends QpidTestCase
public void setUp() throws Exception
{
super.setUp();
- _virtualHostName = getName();
+
+ _virtualHost = mock(ConfiguredObject.class);
+ when(_virtualHost.getName()).thenReturn(getName());
_storeLocation = TestFileUtils.createTestDirectory("json", true);
_configurationStoreSettings = new HashMap<String, Object>();
_configurationStoreSettings.put(JsonFileConfigStore.STORE_TYPE, JsonFileConfigStore.TYPE);
@@ -90,7 +89,7 @@ public class JsonFileConfigStoreTest extends QpidTestCase
_configurationStoreSettings.put(JsonFileConfigStore.STORE_PATH, null);
try
{
- _store.openConfigurationStore(_virtualHostName, _configurationStoreSettings);
+ _store.openConfigurationStore(_virtualHost, _configurationStoreSettings);
fail("Store should not successfully configure if there is no path set");
}
catch (ServerScopedRuntimeException e)
@@ -105,7 +104,7 @@ public class JsonFileConfigStoreTest extends QpidTestCase
_configurationStoreSettings.put(JsonFileConfigStore.STORE_PATH, System.getProperty("file.separator"));
try
{
- _store.openConfigurationStore(_virtualHostName, _configurationStoreSettings);
+ _store.openConfigurationStore(_virtualHost, _configurationStoreSettings);
fail("Store should not successfully configure if there is an invalid path set");
}
catch (ServerScopedRuntimeException e)
@@ -116,8 +115,8 @@ public class JsonFileConfigStoreTest extends QpidTestCase
public void testStartFromNoStore() throws Exception
{
- _store.openConfigurationStore(_virtualHostName, _configurationStoreSettings);
- _store.recoverConfigurationStore(mock(ConfiguredObject.class), _recoveryHandler);
+ _store.openConfigurationStore(_virtualHost, _configurationStoreSettings);
+ _store.recoverConfigurationStore(_recoveryHandler);
InOrder inorder = inOrder(_recoveryHandler);
inorder.verify(_recoveryHandler).beginConfigurationRecovery(eq(_store), eq(0));
inorder.verify(_recoveryHandler,never()).configuredObject(any(ConfiguredObjectRecord.class));
@@ -130,12 +129,12 @@ public class JsonFileConfigStoreTest extends QpidTestCase
final int NEW_CONFIG_VERSION = 42;
when(_recoveryHandler.completeConfigurationRecovery()).thenReturn(NEW_CONFIG_VERSION);
- _store.openConfigurationStore(_virtualHostName, _configurationStoreSettings);
- _store.recoverConfigurationStore(mock(ConfiguredObject.class), _recoveryHandler);
+ _store.openConfigurationStore(_virtualHost, _configurationStoreSettings);
+ _store.recoverConfigurationStore(_recoveryHandler);
_store.closeConfigurationStore();
- _store.openConfigurationStore(_virtualHostName, _configurationStoreSettings);
- _store.recoverConfigurationStore(mock(ConfiguredObject.class), _recoveryHandler);
+ _store.openConfigurationStore(_virtualHost, _configurationStoreSettings);
+ _store.recoverConfigurationStore(_recoveryHandler);
InOrder inorder = inOrder(_recoveryHandler);
// first time the config version should be the initial version - 0
@@ -149,7 +148,7 @@ public class JsonFileConfigStoreTest extends QpidTestCase
public void testCreateObject() throws Exception
{
- _store.openConfigurationStore(_virtualHostName, _configurationStoreSettings);
+ _store.openConfigurationStore(_virtualHost, _configurationStoreSettings);
final UUID queueId = new UUID(0, 1);
final String queueType = Queue.class.getSimpleName();
final Map<String,Object> queueAttr = Collections.singletonMap("name", (Object) "q1");
@@ -157,15 +156,15 @@ public class JsonFileConfigStoreTest extends QpidTestCase
_store.create(new ConfiguredObjectRecordImpl(queueId, queueType, queueAttr));
_store.closeConfigurationStore();
- _store.openConfigurationStore(_virtualHostName, _configurationStoreSettings);
- _store.recoverConfigurationStore(mock(ConfiguredObject.class), _recoveryHandler);
+ _store.openConfigurationStore(_virtualHost, _configurationStoreSettings);
+ _store.recoverConfigurationStore(_recoveryHandler);
verify(_recoveryHandler).configuredObject(matchesRecord(queueId, queueType, queueAttr));
_store.closeConfigurationStore();
}
public void testCreateAndUpdateObject() throws Exception
{
- _store.openConfigurationStore(_virtualHostName, _configurationStoreSettings);
+ _store.openConfigurationStore(_virtualHost, _configurationStoreSettings);
final UUID queueId = new UUID(0, 1);
final String queueType = Queue.class.getSimpleName();
Map<String,Object> queueAttr = Collections.singletonMap("name", (Object) "q1");
@@ -179,8 +178,8 @@ public class JsonFileConfigStoreTest extends QpidTestCase
_store.closeConfigurationStore();
- _store.openConfigurationStore(_virtualHostName, _configurationStoreSettings);
- _store.recoverConfigurationStore(mock(ConfiguredObject.class), _recoveryHandler);
+ _store.openConfigurationStore(_virtualHost, _configurationStoreSettings);
+ _store.recoverConfigurationStore(_recoveryHandler);
verify(_recoveryHandler).configuredObject(matchesRecord(queueId, queueType, queueAttr));
_store.closeConfigurationStore();
}
@@ -188,7 +187,7 @@ public class JsonFileConfigStoreTest extends QpidTestCase
public void testCreateAndRemoveObject() throws Exception
{
- _store.openConfigurationStore(_virtualHostName, _configurationStoreSettings);
+ _store.openConfigurationStore(_virtualHost, _configurationStoreSettings);
final UUID queueId = new UUID(0, 1);
final String queueType = Queue.class.getSimpleName();
Map<String,Object> queueAttr = Collections.singletonMap("name", (Object) "q1");
@@ -201,15 +200,15 @@ public class JsonFileConfigStoreTest extends QpidTestCase
_store.closeConfigurationStore();
- _store.openConfigurationStore(_virtualHostName, _configurationStoreSettings);
- _store.recoverConfigurationStore(mock(ConfiguredObject.class), _recoveryHandler);
+ _store.openConfigurationStore(_virtualHost, _configurationStoreSettings);
+ _store.recoverConfigurationStore(_recoveryHandler);
verify(_recoveryHandler, never()).configuredObject(any(ConfiguredObjectRecord.class));
_store.closeConfigurationStore();
}
public void testCreateUnknownObjectType() throws Exception
{
- _store.openConfigurationStore(_virtualHostName, _configurationStoreSettings);
+ _store.openConfigurationStore(_virtualHost, _configurationStoreSettings);
try
{
_store.create(new ConfiguredObjectRecordImpl(UUID.randomUUID(), "wibble", Collections.<String, Object>emptyMap()));
@@ -223,7 +222,7 @@ public class JsonFileConfigStoreTest extends QpidTestCase
public void testTwoObjectsWithSameId() throws Exception
{
- _store.openConfigurationStore(_virtualHostName, _configurationStoreSettings);
+ _store.openConfigurationStore(_virtualHost, _configurationStoreSettings);
final UUID id = UUID.randomUUID();
_store.create(new ConfiguredObjectRecordImpl(id, "Queue", Collections.<String, Object>emptyMap()));
try
@@ -240,11 +239,11 @@ public class JsonFileConfigStoreTest extends QpidTestCase
public void testChangeTypeOfObject() throws Exception
{
- _store.openConfigurationStore(_virtualHostName, _configurationStoreSettings);
+ _store.openConfigurationStore(_virtualHost, _configurationStoreSettings);
final UUID id = UUID.randomUUID();
_store.create(new ConfiguredObjectRecordImpl(id, "Queue", Collections.<String, Object>emptyMap()));
_store.closeConfigurationStore();
- _store.openConfigurationStore(_virtualHostName, _configurationStoreSettings);
+ _store.openConfigurationStore(_virtualHost, _configurationStoreSettings);
try
{
@@ -259,13 +258,13 @@ public class JsonFileConfigStoreTest extends QpidTestCase
public void testLockFileGuaranteesExclusiveAccess() throws Exception
{
- _store.openConfigurationStore(_virtualHostName, _configurationStoreSettings);
+ _store.openConfigurationStore(_virtualHost, _configurationStoreSettings);
JsonFileConfigStore secondStore = new JsonFileConfigStore();
try
{
- secondStore.openConfigurationStore(_virtualHostName, _configurationStoreSettings);
+ secondStore.openConfigurationStore(_virtualHost, _configurationStoreSettings);
fail("Should not be able to open a second store with the same path");
}
catch(ServerScopedRuntimeException e)
@@ -273,7 +272,7 @@ public class JsonFileConfigStoreTest extends QpidTestCase
// pass
}
_store.closeConfigurationStore();
- secondStore.openConfigurationStore(_virtualHostName, _configurationStoreSettings);
+ secondStore.openConfigurationStore(_virtualHost, _configurationStoreSettings);
}
@@ -281,7 +280,7 @@ public class JsonFileConfigStoreTest extends QpidTestCase
public void testCreatedNestedObjects() throws Exception
{
- _store.openConfigurationStore(_virtualHostName, _configurationStoreSettings);
+ _store.openConfigurationStore(_virtualHost, _configurationStoreSettings);
final UUID queueId = new UUID(0, 1);
final UUID queue2Id = new UUID(1, 1);
@@ -311,8 +310,8 @@ public class JsonFileConfigStoreTest extends QpidTestCase
new ConfiguredObjectRecordImpl(binding2Id, "Binding", EMPTY_ATTR, binding2Parents);
_store.update(true, bindingRecord, binding2Record);
_store.closeConfigurationStore();
- _store.openConfigurationStore(_virtualHostName, _configurationStoreSettings);
- _store.recoverConfigurationStore(mock(ConfiguredObject.class), _recoveryHandler);
+ _store.openConfigurationStore(_virtualHost, _configurationStoreSettings);
+ _store.recoverConfigurationStore(_recoveryHandler);
verify(_recoveryHandler).configuredObject(matchesRecord(queueId, "Queue", EMPTY_ATTR));
verify(_recoveryHandler).configuredObject(matchesRecord(queue2Id, "Queue", EMPTY_ATTR));
verify(_recoveryHandler).configuredObject(matchesRecord(exchangeId, "Exchange", EMPTY_ATTR));
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java
index fc69a53c85..451a2744c3 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java
@@ -33,7 +33,6 @@ import java.util.UUID;
import org.apache.log4j.Logger;
import org.apache.qpid.server.message.EnqueueableMessage;
import org.apache.qpid.server.model.ConfiguredObject;
-import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler;
import org.apache.qpid.test.utils.QpidTestCase;
import org.apache.qpid.util.FileUtils;
@@ -69,8 +68,10 @@ public abstract class MessageStoreQuotaEventsTestBase extends QpidTestCase imple
MessageStoreRecoveryHandler recoveryHandler = mock(MessageStoreRecoveryHandler.class);
when(recoveryHandler.begin()).thenReturn(mock(StoredMessageRecoveryHandler.class));
- _store.openMessageStore("test", storeSettings);
- _store.recoverMessageStore(mock(ConfiguredObject.class), recoveryHandler, null);
+ ConfiguredObject<?> parent = mock(ConfiguredObject.class);
+ when(parent.getName()).thenReturn("test");
+ _store.openMessageStore(parent, storeSettings);
+ _store.recoverMessageStore(recoveryHandler, null);
_transactionResource = UUID.randomUUID();
_events = new ArrayList<Event>();
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java
index 1d95133784..51d3fc15d2 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java
@@ -46,11 +46,15 @@ public abstract class MessageStoreTestCase extends QpidTestCase
private MessageStore _store;
private Map<String, Object> _storeSettings;
+ private ConfiguredObject<?> _parent;
public void setUp() throws Exception
{
super.setUp();
+ _parent = mock(ConfiguredObject.class);
+ when(_parent.getName()).thenReturn("test");
+
_storedMessageRecoveryHandler = mock(StoredMessageRecoveryHandler.class);
_logRecoveryHandler = mock(TransactionLogRecoveryHandler.class);
_messageStoreRecoveryHandler = mock(MessageStoreRecoveryHandler.class);
@@ -65,8 +69,8 @@ public abstract class MessageStoreTestCase extends QpidTestCase
_store = createMessageStore();
- _store.openMessageStore("test", _storeSettings);
- _store.recoverMessageStore(mock(ConfiguredObject.class), _messageStoreRecoveryHandler, _logRecoveryHandler);
+ _store.openMessageStore(_parent, _storeSettings);
+ _store.recoverMessageStore(_messageStoreRecoveryHandler, _logRecoveryHandler);
}
protected abstract Map<String, Object> getStoreSettings() throws Exception;
@@ -106,8 +110,8 @@ public abstract class MessageStoreTestCase extends QpidTestCase
_store.closeMessageStore();
_store = createMessageStore();
- _store.openMessageStore("test", _storeSettings);
- _store.recoverMessageStore(mock(ConfiguredObject.class), _messageStoreRecoveryHandler, _logRecoveryHandler);
+ _store.openMessageStore(_parent, _storeSettings);
+ _store.recoverMessageStore(_messageStoreRecoveryHandler, _logRecoveryHandler);
}
private Record getTestRecord(long messageNumber)
{
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java
index c90c6af220..f6a251a691 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java
@@ -184,7 +184,7 @@ public class DurableConfigurationRecovererTest extends QpidTestCase
}
_durableConfigurationRecoverer =
new DurableConfigurationRecoverer(_vhost.getName(), recovererMap,
- new DefaultUpgraderProvider(_vhost, _exchangeRegistry), new EventLogger());
+ new DefaultUpgraderProvider(_vhost), new EventLogger());
_store = mock(DurableConfigurationStore.class);
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java
index f20ddbc367..406a20d557 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java
@@ -25,6 +25,7 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.qpid.server.message.EnqueueableMessage;
import org.apache.qpid.server.message.MessageContentSource;
+import org.apache.qpid.server.model.ConfiguredObject;
public class QuotaMessageStore extends NullMessageStore
{
@@ -40,7 +41,7 @@ public class QuotaMessageStore extends NullMessageStore
@Override
- public void openMessageStore(String virtualHostName, Map<String, Object> messageStoreSettings)
+ public void openMessageStore(ConfiguredObject<?> parent, Map<String, Object> messageStoreSettings)
{
Object overfullAttr = messageStoreSettings.get(MessageStore.OVERFULL_SIZE);
_persistentSizeHighThreshold = overfullAttr == null
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
index 8ec576c7ca..e20196c98d 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
@@ -53,20 +53,20 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore
private Map<EventListener, Event[]> _eventListeners = new ConcurrentHashMap<EventListener, Event[]>();
@Override
- public void openConfigurationStore(String virtualHostName, Map<String, Object> storeSettings)
+ public void openConfigurationStore(ConfiguredObject<?> parent, Map<String, Object> storeSettings)
{
if (storeSettings != null && storeSettings.get(REAL_STORE) != null)
{
final String realStore = (String) storeSettings.get(REAL_STORE);
_realDurableConfigurationStore = new DurableConfigurationStoreCreator().createMessageStore(realStore);
- _realDurableConfigurationStore.openConfigurationStore(virtualHostName, storeSettings);
+ _realDurableConfigurationStore.openConfigurationStore(parent, storeSettings);
}
}
@Override
- public void recoverConfigurationStore(ConfiguredObject<?> parent, ConfigurationRecoveryHandler recoveryHandler)
+ public void recoverConfigurationStore(ConfigurationRecoveryHandler recoveryHandler)
{
- _realDurableConfigurationStore.recoverConfigurationStore(parent, recoveryHandler);
+ _realDurableConfigurationStore.recoverConfigurationStore(recoveryHandler);
}
private void configureDelays(Map<String, Object> delays)
@@ -134,7 +134,7 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore
}
@Override
- public void openMessageStore(String virtualHostName, Map<String, Object> messageStoreSettings)
+ public void openMessageStore(ConfiguredObject<?> parent, Map<String, Object> messageStoreSettings)
{
Object delaysAttr = messageStoreSettings.get(DELAYS);
@@ -160,7 +160,7 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore
it.remove();
}
}
- _realMessageStore.openMessageStore(virtualHostName, messageStoreSettings);
+ _realMessageStore.openMessageStore(parent, messageStoreSettings);
if (_realDurableConfigurationStore == null)
{
@@ -294,9 +294,9 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore
}
@Override
- public void recoverMessageStore(ConfiguredObject<?> parent, MessageStoreRecoveryHandler messageRecoveryHandler, TransactionLogRecoveryHandler transactionLogRecoveryHandler)
+ public void recoverMessageStore(MessageStoreRecoveryHandler messageRecoveryHandler, TransactionLogRecoveryHandler transactionLogRecoveryHandler)
{
- _realMessageStore.recoverMessageStore(parent, messageRecoveryHandler, transactionLogRecoveryHandler);
+ _realMessageStore.recoverMessageStore(messageRecoveryHandler, transactionLogRecoveryHandler);
}
@Override