summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Gemmell <robbie@apache.org>2012-05-17 17:26:04 +0000
committerRobert Gemmell <robbie@apache.org>2012-05-17 17:26:04 +0000
commitf5d67044a9797c397764a7ac1aa1a1ed4aa893a3 (patch)
treecf8a9cf6a5f741e31417ca4d32a6b708bb3b9fdd
parentf523b9e510fc90ce3f7f7d7c2960f3bfee3d42df (diff)
downloadqpid-python-f5d67044a9797c397764a7ac1aa1a1ed4aa893a3.tar.gz
QPID-4006: add support for using BDB HA to form an active-passive cluster for persistent messaging
- Includes support for setting BDB configuration parameters via the store configuration, both for the existing store and the new HA variant. - Removes the MessageStoreFactory and reverts store configuration to historical values. Applied patch from Keith Wall, Andrew MacBean <andymacbean@gmail.com>, Oleksandr Rudyy <orudyy@gmail.com>, Philip Harvey <phil@philharveyonline.com>, and myself. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1339728 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/bdbstore/build.xml2
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java120
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java490
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreManagerMBean.java184
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java253
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java227
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ManagedBDBHAMessageStore.java75
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java8
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreManagerMBeanTest.java218
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreTest.java144
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java20
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java163
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java233
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterTwoNodeTest.java234
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterWhiteboxTest.java288
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAMessageStoreSmokeTest.java (renamed from qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java)31
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java479
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderTest.java6
-rw-r--r--qpid/java/broker/etc/virtualhosts.xml12
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java9
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/Event.java10
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/EventManager.java8
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/HAMessageStore.java (renamed from qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreFactory.java)10
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java6
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStoreFactory.java37
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/State.java27
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/StateManager.java33
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java10
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStoreFactory.java40
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java90
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java4
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java3
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java5
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java3
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/store/DurableConfigurationStoreTest.java13
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java3
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/store/OperationalLoggingListenerTest.java6
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/store/StateManagerTest.java62
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStoreFactory.java37
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java5
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostImplTest.java4
-rw-r--r--qpid/java/common/src/test/java/org/apache/qpid/test/utils/QpidTestCase.java12
-rw-r--r--qpid/java/module.xml2
-rw-r--r--qpid/java/systests/etc/log.properties2
-rw-r--r--qpid/java/systests/etc/virtualhosts-systests-bdb-settings.xml6
-rw-r--r--qpid/java/systests/etc/virtualhosts-systests-derby-settings.xml6
-rw-r--r--qpid/java/systests/etc/virtualhosts-systests-firewall-2.xml4
-rw-r--r--qpid/java/systests/etc/virtualhosts-systests-firewall-3.xml4
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStoreFactory.java37
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/store/StoreOverfullTest.java8
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitDelayTest.java2
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/utils/JMXTestUtils.java12
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java137
-rw-r--r--qpid/java/test-profiles/java-bdb-spawn.0-10.testprofile2
-rw-r--r--qpid/java/test-profiles/java-bdb-spawn.0-8.testprofile2
-rw-r--r--qpid/java/test-profiles/java-bdb-spawn.0-9-1.testprofile2
-rw-r--r--qpid/java/test-profiles/java-bdb-spawn.0-9.testprofile2
-rw-r--r--qpid/java/test-profiles/java-bdb.0-10.testprofile2
-rw-r--r--qpid/java/test-profiles/java-bdb.0-8.testprofile2
-rw-r--r--qpid/java/test-profiles/java-bdb.0-9-1.testprofile2
-rw-r--r--qpid/java/test-profiles/java-bdb.0-9.testprofile2
-rw-r--r--qpid/java/test-profiles/java-dby-spawn.0-10.testprofile2
-rw-r--r--qpid/java/test-profiles/java-dby-spawn.0-8.testprofile2
-rw-r--r--qpid/java/test-profiles/java-dby-spawn.0-9-1.testprofile2
-rw-r--r--qpid/java/test-profiles/java-dby-spawn.0-9.testprofile2
-rw-r--r--qpid/java/test-profiles/java-dby.0-10.testprofile2
-rw-r--r--qpid/java/test-profiles/java-dby.0-8.testprofile2
-rw-r--r--qpid/java/test-profiles/java-dby.0-9-1.testprofile2
-rw-r--r--qpid/java/test-profiles/java-dby.0-9.testprofile2
-rw-r--r--qpid/java/test-profiles/testprofile.defaults2
70 files changed, 3153 insertions, 725 deletions
diff --git a/qpid/java/bdbstore/build.xml b/qpid/java/bdbstore/build.xml
index 7c305c7c2f..7df048c691 100644
--- a/qpid/java/bdbstore/build.xml
+++ b/qpid/java/bdbstore/build.xml
@@ -17,7 +17,7 @@
- under the License.
-->
<project name="bdbstore" xmlns:ivy="antlib:org.apache.ivy.ant" default="build">
- <property name="module.depends" value="common broker" />
+ <property name="module.depends" value="management/common common broker" />
<property name="module.test.depends" value="test client common/test broker/test management/common systests" />
<property name="module.genpom" value="true"/>
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
index a9ae4eb16d..789d5714c8 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
@@ -33,11 +33,12 @@ import com.sleepycat.je.EnvironmentConfig;
import com.sleepycat.je.LockConflictException;
import com.sleepycat.je.LockMode;
import com.sleepycat.je.OperationStatus;
-import com.sleepycat.je.TransactionConfig;
import java.io.File;
import java.lang.ref.SoftReference;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -45,6 +46,7 @@ import java.util.Random;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.ConfigurationException;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQStoreException;
import org.apache.qpid.framing.FieldTable;
@@ -128,8 +130,6 @@ public abstract class AbstractBDBMessageStore implements MessageStore
protected final StateManager _stateManager;
- protected TransactionConfig _transactionConfig = new TransactionConfig();
-
private MessageStoreRecoveryHandler _messageRecoveryHandler;
private TransactionLogRecoveryHandler _tlogRecoveryHandler;
@@ -146,6 +146,8 @@ public abstract class AbstractBDBMessageStore implements MessageStore
private ConfiguredObjectHelper _configuredObjectHelper = new ConfiguredObjectHelper();
+ private Map<String, String> _envConfigMap;
+
public AbstractBDBMessageStore()
{
_stateManager = new StateManager(_eventManager);
@@ -161,7 +163,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
ConfigurationRecoveryHandler recoveryHandler,
Configuration storeConfiguration) throws Exception
{
- _stateManager.attainState(State.CONFIGURING);
+ _stateManager.attainState(State.INITIALISING);
_configRecoveryHandler = recoveryHandler;
@@ -179,12 +181,12 @@ public abstract class AbstractBDBMessageStore implements MessageStore
_messageRecoveryHandler = messageRecoveryHandler;
_tlogRecoveryHandler = tlogRecoveryHandler;
- _stateManager.attainState(State.CONFIGURED);
+ _stateManager.attainState(State.INITIALISED);
}
- public void activate() throws Exception
+ public synchronized void activate() throws Exception
{
- _stateManager.attainState(State.RECOVERING);
+ _stateManager.attainState(State.ACTIVATING);
recoverConfig(_configRecoveryHandler);
recoverMessages(_messageRecoveryHandler);
@@ -231,11 +233,30 @@ public abstract class AbstractBDBMessageStore implements MessageStore
_storeLocation = storeLocation;
+ _envConfigMap = getConfigMap(storeConfig, "envConfig");
+
LOGGER.info("Configuring BDB message store");
setupStore(environmentPath, name);
}
+ protected Map<String,String> getConfigMap(Configuration config, String prefix) throws ConfigurationException
+ {
+ final List<Object> argumentNames = config.getList(prefix + ".name");
+ final List<Object> argumentValues = config.getList(prefix + ".value");
+ final Map<String,String> attributes = new HashMap<String,String>(argumentNames.size());
+
+ for (int i = 0; i < argumentNames.size(); i++)
+ {
+ final String argName = argumentNames.get(i).toString();
+ final String argValue = argumentValues.get(i).toString();
+
+ attributes.put(argName, argValue);
+ }
+
+ return Collections.unmodifiableMap(attributes);
+ }
+
@Override
public String getStoreLocation()
{
@@ -251,9 +272,9 @@ public abstract class AbstractBDBMessageStore implements MessageStore
*/
void startWithNoRecover() throws AMQStoreException
{
- _stateManager.attainState(State.CONFIGURING);
- _stateManager.attainState(State.CONFIGURED);
- _stateManager.attainState(State.RECOVERING);
+ _stateManager.attainState(State.INITIALISING);
+ _stateManager.attainState(State.INITIALISED);
+ _stateManager.attainState(State.ACTIVATING);
_stateManager.attainState(State.ACTIVE);
}
@@ -268,51 +289,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
_totalStoreSize = getSizeOnDisk();
}
- protected Environment createEnvironment(File environmentPath) throws DatabaseException
- {
- LOGGER.info("BDB message store using environment path " + environmentPath.getAbsolutePath());
- EnvironmentConfig envConfig = new EnvironmentConfig();
- // This is what allows the creation of the store if it does not already exist.
- envConfig.setAllowCreate(true);
- envConfig.setTransactional(true);
- envConfig.setConfigParam("je.lock.nLockTables", "7");
-
- // Added to help diagnosis of Deadlock issue
- // http://www.oracle.com/technology/products/berkeley-db/faq/je_faq.html#23
- if (Boolean.getBoolean("qpid.bdb.lock.debug"))
- {
- envConfig.setConfigParam("je.txn.deadlockStackTrace", "true");
- envConfig.setConfigParam("je.txn.dumpLocks", "true");
- }
-
- // Set transaction mode
- _transactionConfig.setReadCommitted(true);
-
- //This prevents background threads running which will potentially update the store.
- envConfig.setReadOnly(false);
- try
- {
- return new Environment(environmentPath, envConfig);
- }
- catch (DatabaseException de)
- {
- if (de.getMessage().contains("Environment.setAllowCreate is false"))
- {
- //Allow the creation this time
- envConfig.setAllowCreate(true);
- if (_environment != null )
- {
- _environment.cleanLog();
- _environment.close();
- }
- return new Environment(environmentPath, envConfig);
- }
- else
- {
- throw de;
- }
- }
- }
+ protected abstract Environment createEnvironment(File environmentPath) throws DatabaseException;
public Environment getEnvironment()
{
@@ -352,14 +329,9 @@ public abstract class AbstractBDBMessageStore implements MessageStore
*/
public void close() throws Exception
{
- if (_stateManager.isInState(State.ACTIVE) || _stateManager.isInState(State.QUIESCED))
- {
- _stateManager.stateTransition(State.ACTIVE, State.CLOSING);
-
- closeInternal();
-
- _stateManager.stateTransition(State.CLOSING, State.CLOSED);
- }
+ _stateManager.attainState(State.CLOSING);
+ closeInternal();
+ _stateManager.attainState(State.CLOSED);
}
protected void closeInternal() throws Exception
@@ -1504,6 +1476,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
return true;
}
+ @SuppressWarnings("unchecked")
public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(T metaData)
{
if(metaData.isPersistent())
@@ -1914,4 +1887,25 @@ public abstract class AbstractBDBMessageStore implements MessageStore
{
return _persistentSizeHighThreshold;
}
+
+ private void setEnvironmentConfigProperties(EnvironmentConfig envConfig)
+ {
+ for (Map.Entry<String, String> configItem : _envConfigMap.entrySet())
+ {
+ LOGGER.debug("Setting EnvironmentConfig key " + configItem.getKey() + " to '" + configItem.getValue() + "'");
+ envConfig.setConfigParam(configItem.getKey(), configItem.getValue());
+ }
+ }
+
+ protected EnvironmentConfig createEnvironmentConfig()
+ {
+ EnvironmentConfig envConfig = new EnvironmentConfig();
+ envConfig.setAllowCreate(true);
+ envConfig.setTransactional(true);
+ envConfig.setConfigParam(EnvironmentConfig.LOCK_N_LOCK_TABLES, "7");
+
+ setEnvironmentConfigProperties(envConfig);
+
+ return envConfig;
+ }
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java
new file mode 100644
index 0000000000..f887c8ce36
--- /dev/null
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java
@@ -0,0 +1,490 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import java.io.File;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQStoreException;
+import org.apache.qpid.server.logging.RootMessageLogger;
+import org.apache.qpid.server.logging.actors.AbstractActor;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.store.HAMessageStore;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.MessageStoreRecoveryHandler;
+import org.apache.qpid.server.store.State;
+import org.apache.qpid.server.store.StoreFuture;
+import org.apache.qpid.server.store.TransactionLogRecoveryHandler;
+
+import com.sleepycat.je.DatabaseException;
+import com.sleepycat.je.Durability;
+import com.sleepycat.je.Durability.ReplicaAckPolicy;
+import com.sleepycat.je.Durability.SyncPolicy;
+import com.sleepycat.je.Environment;
+import com.sleepycat.je.EnvironmentConfig;
+import com.sleepycat.je.OperationFailureException;
+import com.sleepycat.je.Transaction;
+import com.sleepycat.je.rep.InsufficientLogException;
+import com.sleepycat.je.rep.NetworkRestore;
+import com.sleepycat.je.rep.NetworkRestoreConfig;
+import com.sleepycat.je.rep.ReplicatedEnvironment;
+import com.sleepycat.je.rep.ReplicationConfig;
+import com.sleepycat.je.rep.ReplicationMutableConfig;
+import com.sleepycat.je.rep.ReplicationNode;
+import com.sleepycat.je.rep.StateChangeEvent;
+import com.sleepycat.je.rep.StateChangeListener;
+import com.sleepycat.je.rep.util.ReplicationGroupAdmin;
+
+public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMessageStore
+{
+ private static final String MUTLI_SYNC = "MUTLI_SYNC";
+ private static final String DEFAULT_REPLICATION_POLICY =
+ MUTLI_SYNC + "," + SyncPolicy.NO_SYNC.name() + "," + ReplicaAckPolicy.SIMPLE_MAJORITY.name();
+
+ private static final Logger LOGGER = Logger.getLogger(BDBHAMessageStore.class);
+
+ private String _groupName;
+ private String _nodeName;
+ private String _nodeHostPort;
+ private String _helperHostPort;
+ private String _replicationPolicy;
+ private Durability _replicationDurability;
+
+ private String _name;
+
+ private BDBHAMessageStoreManagerMBean _managedObject;
+
+ public static final String GRP_MEM_COL_NODE_HOST_PORT = "NodeHostPort";
+
+ public static final String GRP_MEM_COL_NODE_NAME = "NodeName";
+
+ private CommitThreadWrapper _commitThreadWrapper;
+ private boolean _localMultiSyncCommits;
+ private boolean _autoDesignatedPrimary;
+ private Map<String, String> _repConfigMap;
+
+ @Override
+ public void configure(String name, Configuration storeConfig) throws Exception
+ {
+ //Mandatory configuration
+ _groupName = getValidatedPropertyFromConfig("highAvailability.groupName", storeConfig);
+ _nodeName = getValidatedPropertyFromConfig("highAvailability.nodeName", storeConfig);
+ _nodeHostPort = getValidatedPropertyFromConfig("highAvailability.nodeHostPort", storeConfig);
+ _helperHostPort = getValidatedPropertyFromConfig("highAvailability.helperHostPort", storeConfig);
+ _name = name;
+
+ //Optional configuration
+ _replicationPolicy = storeConfig.getString("highAvailability.replicationPolicy", DEFAULT_REPLICATION_POLICY).trim();
+ _autoDesignatedPrimary = storeConfig.getBoolean("highAvailability.autoDesignatedPrimary", Boolean.TRUE);
+
+ if(_replicationPolicy.startsWith(MUTLI_SYNC))
+ {
+ _replicationDurability = Durability.parse(_replicationPolicy.replaceFirst(MUTLI_SYNC, SyncPolicy.SYNC.name()));
+ _localMultiSyncCommits = true;
+ }
+ else
+ {
+ _replicationDurability = Durability.parse(_replicationPolicy);
+ _localMultiSyncCommits = false;
+ }
+
+ _repConfigMap = getConfigMap(storeConfig, "repConfig");
+
+ _managedObject = new BDBHAMessageStoreManagerMBean(this);
+ _managedObject.register();
+
+ super.configure(name, storeConfig);
+ }
+
+ @Override
+ protected void setupStore(File storePath, String name) throws DatabaseException, AMQStoreException
+ {
+ super.setupStore(storePath, name);
+
+ if(_localMultiSyncCommits)
+ {
+ _commitThreadWrapper = new CommitThreadWrapper("Commit-Thread-" + name, getEnvironment());
+ _commitThreadWrapper.startCommitThread();
+ }
+ }
+
+ private String getValidatedPropertyFromConfig(String key, Configuration config) throws ConfigurationException
+ {
+ if (!config.containsKey(key))
+ {
+ throw new ConfigurationException("BDB HA configuration key not found. Please specify configuration key with XPath: "
+ + key.replace('.', '/'));
+ }
+ return config.getString(key);
+ }
+
+ @Override
+ protected Environment createEnvironment(File environmentPath) throws DatabaseException
+ {
+ if (LOGGER.isInfoEnabled())
+ {
+ LOGGER.info("Environment path " + environmentPath.getAbsolutePath());
+ LOGGER.info("Group name " + _groupName);
+ LOGGER.info("Node name " + _nodeName);
+ LOGGER.info("Node host port " + _nodeHostPort);
+ LOGGER.info("Helper host port " + _helperHostPort);
+ LOGGER.info("Replication policy " + _replicationPolicy);
+ }
+
+ final ReplicationConfig replicationConfig = new ReplicationConfig(_groupName, _nodeName, _nodeHostPort);
+
+ replicationConfig.setHelperHosts(_helperHostPort);
+ setReplicationConfigProperties(replicationConfig);
+
+ final EnvironmentConfig envConfig = createEnvironmentConfig();
+ envConfig.setDurability(_replicationDurability);
+
+ ReplicatedEnvironment replicatedEnvironment = null;
+ try
+ {
+ replicatedEnvironment = new ReplicatedEnvironment(environmentPath, replicationConfig, envConfig);
+ }
+ catch (final InsufficientLogException ile)
+ {
+ LOGGER.info("InsufficientLogException thrown and so full network restore required", ile);
+ NetworkRestore restore = new NetworkRestore();
+ NetworkRestoreConfig config = new NetworkRestoreConfig();
+ config.setRetainLogFiles(false);
+ restore.execute(ile, config);
+ replicatedEnvironment = new ReplicatedEnvironment(environmentPath, replicationConfig, envConfig);
+ }
+
+ return replicatedEnvironment;
+ }
+
+ @Override
+ public void configureMessageStore(String name, MessageStoreRecoveryHandler messageRecoveryHandler,
+ TransactionLogRecoveryHandler tlogRecoveryHandler,
+ Configuration config) throws Exception
+ {
+ super.configureMessageStore(name, messageRecoveryHandler, tlogRecoveryHandler, config);
+
+ final ReplicatedEnvironment replicatedEnvironment = getReplicatedEnvironment();
+
+ replicatedEnvironment.setStateChangeListener(new BDBHAMessageStoreStateChangeListener());
+ }
+
+ @Override
+ public synchronized void passivate()
+ {
+ if (_stateManager.isNotInState(State.INITIALISED))
+ {
+ LOGGER.debug("Store becoming passive");
+ _stateManager.attainState(State.INITIALISED);
+ }
+ }
+
+ @Override
+ protected void closeInternal() throws Exception
+ {
+ try
+ {
+ if(_localMultiSyncCommits)
+ {
+ _commitThreadWrapper.stopCommitThread();
+ }
+ super.closeInternal();
+ }
+ finally
+ {
+ if (_managedObject != null)
+ {
+ _managedObject.unregister();
+ }
+ }
+ }
+
+ @Override
+ protected StoreFuture commit(Transaction tx, boolean syncCommit) throws DatabaseException
+ {
+ // Using commit() instead of commitNoSync() for the HA store to allow
+ // the HA durability configuration to influence resulting behaviour.
+ tx.commit();
+
+
+ if(_localMultiSyncCommits)
+ {
+ return _commitThreadWrapper.commit(tx, syncCommit);
+ }
+ else
+ {
+ return StoreFuture.IMMEDIATE_FUTURE;
+ }
+ }
+
+ public String getName()
+ {
+ return _name;
+ }
+
+ public String getGroupName()
+ {
+ return _groupName;
+ }
+
+ public String getNodeName()
+ {
+ return _nodeName;
+ }
+
+ public String getNodeHostPort()
+ {
+ return _nodeHostPort;
+ }
+
+ public String getHelperHostPort()
+ {
+ return _helperHostPort;
+ }
+
+ public String getReplicationPolicy()
+ {
+ return _replicationPolicy;
+ }
+
+ public String getNodeState()
+ {
+ ReplicatedEnvironment.State state = getReplicatedEnvironment().getState();
+ return state.toString();
+ }
+
+ public Boolean isDesignatedPrimary()
+ {
+ return getReplicatedEnvironment().getRepMutableConfig().getDesignatedPrimary();
+ }
+
+ public List<Map<String, String>> getGroupMembers()
+ {
+ List<Map<String, String>> members = new ArrayList<Map<String,String>>();
+
+ for (ReplicationNode node : getReplicatedEnvironment().getGroup().getNodes())
+ {
+ Map<String, String> nodeMap = new HashMap<String, String>();
+ nodeMap.put(BDBHAMessageStore.GRP_MEM_COL_NODE_NAME, node.getName());
+ nodeMap.put(BDBHAMessageStore.GRP_MEM_COL_NODE_HOST_PORT, node.getHostName() + ":" + node.getPort());
+ members.add(nodeMap);
+ }
+
+ return members;
+ }
+
+ public void removeNodeFromGroup(String nodeName) throws AMQStoreException
+ {
+ try
+ {
+ createReplicationGroupAdmin().removeMember(nodeName);
+ }
+ catch (OperationFailureException ofe)
+ {
+ throw new AMQStoreException("Failed to remove '" + nodeName + "' from group. " + ofe.getMessage(), ofe);
+ }
+ catch (DatabaseException e)
+ {
+ throw new AMQStoreException("Failed to remove '" + nodeName + "' from group. " + e.getMessage(), e);
+ }
+ }
+
+ public void setDesignatedPrimary(boolean isPrimary) throws AMQStoreException
+ {
+ try
+ {
+ final ReplicatedEnvironment replicatedEnvironment = getReplicatedEnvironment();
+ synchronized(replicatedEnvironment)
+ {
+ final ReplicationMutableConfig oldConfig = replicatedEnvironment.getRepMutableConfig();
+ final ReplicationMutableConfig newConfig = oldConfig.setDesignatedPrimary(isPrimary);
+ replicatedEnvironment.setRepMutableConfig(newConfig);
+ }
+
+ LOGGER.info("Node " + _nodeName + " successfully set as designated primary for group");
+ }
+ catch (DatabaseException e)
+ {
+ throw new AMQStoreException("Failed to set '" + _nodeName + "' as designated primary for group. " + e.getMessage(), e);
+ }
+ }
+
+ ReplicatedEnvironment getReplicatedEnvironment()
+ {
+ return (ReplicatedEnvironment)getEnvironment();
+ }
+
+ public void updateAddress(String nodeName, String newHostName, int newPort) throws AMQStoreException
+ {
+ try
+ {
+ createReplicationGroupAdmin().updateAddress(nodeName, newHostName, newPort);
+ }
+ catch (OperationFailureException ofe)
+ {
+ throw new AMQStoreException("Failed to update address for '" + nodeName +
+ "' with new host " + newHostName + " and new port " + newPort + ". " + ofe.getMessage(), ofe);
+ }
+ catch (DatabaseException e)
+ {
+ throw new AMQStoreException("Failed to update address for '" + nodeName +
+ "' with new host " + newHostName + " and new port " + newPort + ". " + e.getMessage(), e);
+ }
+ }
+
+ private ReplicationGroupAdmin createReplicationGroupAdmin()
+ {
+ final Set<InetSocketAddress> helpers = new HashSet<InetSocketAddress>();
+ helpers.addAll(getReplicatedEnvironment().getRepConfig().getHelperSockets());
+
+ final ReplicationConfig repConfig = getReplicatedEnvironment().getRepConfig();
+ helpers.add(InetSocketAddress.createUnresolved(repConfig.getNodeHostname(), repConfig.getNodePort()));
+
+ return new ReplicationGroupAdmin(_groupName, helpers);
+ }
+
+ private class BDBHAMessageStoreStateChangeListener implements StateChangeListener
+ {
+ private final Executor _executor = Executors.newSingleThreadExecutor();
+
+ @Override
+ public void stateChange(StateChangeEvent stateChangeEvent) throws RuntimeException
+ {
+ com.sleepycat.je.rep.ReplicatedEnvironment.State state = stateChangeEvent.getState();
+
+ LOGGER.info("Received BDB event indicating transition to state " + state);
+
+ switch (state)
+ {
+ case MASTER:
+ activateStoreAsync();
+ break;
+ case REPLICA:
+ passivateStore();
+ break;
+ case DETACHED:
+ LOGGER.error("BDB replicated node in detached state, therefore passivating.");
+ passivateStore();
+ break;
+ case UNKNOWN:
+ LOGGER.warn("BDB replicated node in unknown state (hopefully temporarily)");
+ break;
+ default:
+ LOGGER.error("Unexpected state change: " + state);
+ throw new IllegalStateException("Unexpected state change: " + state);
+ }
+ }
+
+ /** synchronously calls passivate. This is acceptable because {@link HAMessageStore#passivate()} is expected to be fast */
+ private void passivateStore()
+ {
+ try
+ {
+ passivate();
+ }
+ catch(Exception e)
+ {
+ LOGGER.error("Unable to passivate", e);
+ throw new RuntimeException("Unable to passivate", e);
+ }
+ }
+
+ /**
+ * Calls {@link MessageStore#activate()}.
+ *
+ * <p/>
+ *
+ * This is done a background thread, in line with
+ * {@link StateChangeListener#stateChange(StateChangeEvent)}'s JavaDoc, because
+ * activate may execute transactions, which can't complete until
+ * {@link StateChangeListener#stateChange(StateChangeEvent)} has returned.
+ */
+ private void activateStoreAsync()
+ {
+ final RootMessageLogger _rootLogger = CurrentActor.get().getRootMessageLogger();
+
+ _executor.execute(new Runnable()
+ {
+ private static final String _THREAD_NAME = "BDBHANodeActivationThread";
+
+ @Override
+ public void run()
+ {
+ Thread.currentThread().setName(_THREAD_NAME);
+ CurrentActor.set(new AbstractActor(_rootLogger)
+ {
+ @Override
+ public String getLogMessage()
+ {
+ return _THREAD_NAME;
+ }
+ });
+
+ try
+ {
+ activate();
+ }
+ catch (Exception e)
+ {
+ LOGGER.error("Failed to activate on hearing MASTER change event",e);
+ }
+
+ }
+ });
+ }
+ }
+
+ @Override
+ public synchronized void activate() throws Exception
+ {
+ // Before proceeding, perform a log flush with an fsync
+ getEnvironment().flushLog(true);
+
+ super.activate();
+
+ //For replica groups with 2 electable nodes, set the new master to be the
+ //designated primary, such that it can continue working if the replica goes
+ //down and leaves it without a 'majority of 2'.
+ if(getReplicatedEnvironment().getGroup().getElectableNodes().size() <= 2 && _autoDesignatedPrimary)
+ {
+ setDesignatedPrimary(true);
+ }
+ }
+
+ private void setReplicationConfigProperties(ReplicationConfig replicationConfig)
+ {
+ for (Map.Entry<String, String> configItem : _repConfigMap.entrySet())
+ {
+ LOGGER.debug("Setting ReplicationConfig key " + configItem.getKey() + " to '" + configItem.getValue() + "'");
+ replicationConfig.setConfigParam(configItem.getKey(), configItem.getValue());
+ }
+ }
+}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreManagerMBean.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreManagerMBean.java
new file mode 100644
index 0000000000..cd4e990607
--- /dev/null
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreManagerMBean.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import javax.management.JMException;
+import javax.management.NotCompliantMBeanException;
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.CompositeDataSupport;
+import javax.management.openmbean.CompositeType;
+import javax.management.openmbean.OpenDataException;
+import javax.management.openmbean.OpenType;
+import javax.management.openmbean.SimpleType;
+import javax.management.openmbean.TabularData;
+import javax.management.openmbean.TabularDataSupport;
+import javax.management.openmbean.TabularType;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQStoreException;
+import org.apache.qpid.server.management.AMQManagedObject;
+
+public class BDBHAMessageStoreManagerMBean extends AMQManagedObject implements ManagedBDBHAMessageStore
+{
+ private static final Logger LOGGER = Logger.getLogger(BDBHAMessageStoreManagerMBean.class);
+
+ private static final TabularType GROUP_MEMBERS_TABLE;
+ private static final CompositeType GROUP_MEMBER_ROW;
+ private static final OpenType<?>[] GROUP_MEMBER_ATTRIBUTE_TYPES;
+
+ static
+ {
+ try
+ {
+ GROUP_MEMBER_ATTRIBUTE_TYPES = new OpenType<?>[] {SimpleType.STRING, SimpleType.STRING};
+ final String[] itemNames = new String[] {BDBHAMessageStore.GRP_MEM_COL_NODE_NAME, BDBHAMessageStore.GRP_MEM_COL_NODE_HOST_PORT};
+ final String[] itemDescriptions = new String[] {"Unique node name", "Node host / port "};
+ GROUP_MEMBER_ROW = new CompositeType("GroupMember", "Replication group member",
+ itemNames,
+ itemDescriptions,
+ GROUP_MEMBER_ATTRIBUTE_TYPES );
+ GROUP_MEMBERS_TABLE = new TabularType("GroupMembers", "Replication group memebers",
+ GROUP_MEMBER_ROW,
+ new String[] {BDBHAMessageStore.GRP_MEM_COL_NODE_NAME});
+ }
+ catch (final OpenDataException ode)
+ {
+ throw new ExceptionInInitializerError(ode);
+ }
+ }
+
+ private final BDBHAMessageStore _store;
+
+ protected BDBHAMessageStoreManagerMBean(BDBHAMessageStore store) throws NotCompliantMBeanException
+ {
+ super(ManagedBDBHAMessageStore.class, ManagedBDBHAMessageStore.TYPE);
+ _store = store;
+ }
+
+ @Override
+ public String getObjectInstanceName()
+ {
+ return _store.getName();
+ }
+
+ @Override
+ public String getGroupName() throws IOException
+ {
+ return _store.getGroupName();
+ }
+
+ @Override
+ public String getNodeName() throws IOException
+ {
+ return _store.getNodeName();
+ }
+
+ @Override
+ public String getNodeHostPort() throws IOException
+ {
+ return _store.getNodeHostPort();
+ }
+
+ @Override
+ public String getHelperHostPort() throws IOException
+ {
+ return _store.getHelperHostPort();
+ }
+
+ @Override
+ public String getReplicationPolicy() throws IOException
+ {
+ return _store.getReplicationPolicy();
+ }
+
+ @Override
+ public String getNodeState() throws IOException
+ {
+ return _store.getNodeState();
+ }
+
+ @Override
+ public boolean getDesignatedPrimary() throws IOException
+ {
+ return _store.isDesignatedPrimary();
+ }
+
+ @Override
+ public TabularData getAllNodesInGroup() throws IOException, JMException
+ {
+ final TabularDataSupport data = new TabularDataSupport(GROUP_MEMBERS_TABLE);
+ final List<Map<String, String>> members = _store.getGroupMembers();
+
+ for (Map<String, String> map : members)
+ {
+ CompositeData memberData = new CompositeDataSupport(GROUP_MEMBER_ROW, map);
+ data.put(memberData);
+ }
+ return data;
+ }
+
+ @Override
+ public void removeNodeFromGroup(String nodeName) throws JMException
+ {
+ try
+ {
+ _store.removeNodeFromGroup(nodeName);
+ }
+ catch (AMQStoreException e)
+ {
+ LOGGER.error("Failed to remove node " + nodeName + " from group", e);
+ throw new JMException(e.getMessage());
+ }
+ }
+
+ @Override
+ public void setDesignatedPrimary(boolean primary) throws JMException
+ {
+ try
+ {
+ _store.setDesignatedPrimary(primary);
+ }
+ catch (AMQStoreException e)
+ {
+ LOGGER.error("Failed to set node " + _store.getNodeName() + " as designated primary", e);
+ throw new JMException(e.getMessage());
+ }
+ }
+
+ @Override
+ public void updateAddress(String nodeName, String newHostName, int newPort) throws JMException
+ {
+ try
+ {
+ _store.updateAddress(nodeName, newHostName, newPort);
+ }
+ catch(AMQStoreException e)
+ {
+ LOGGER.error("Failed to update address for node " + nodeName + " to " + newHostName + ":" + newPort, e);
+ throw new JMException(e.getMessage());
+ }
+ }
+
+
+}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
index b414441b92..7c29e281d9 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
@@ -21,23 +21,12 @@
package org.apache.qpid.server.store.berkeleydb;
import java.io.File;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQStoreException;
-import org.apache.qpid.server.store.Event;
-import org.apache.qpid.server.store.EventListener;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreFuture;
-import com.sleepycat.je.CheckpointConfig;
import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.Environment;
import com.sleepycat.je.EnvironmentConfig;
@@ -53,52 +42,22 @@ import com.sleepycat.je.EnvironmentConfig;
public class BDBMessageStore extends AbstractBDBMessageStore
{
private static final Logger LOGGER = Logger.getLogger(BDBMessageStore.class);
-
- private final CommitThread _commitThread = new CommitThread("Commit-Thread");
-
- private final Map<Event, List<EventListener>> _eventListeners = new HashMap<Event, List<EventListener>>();
+ private CommitThreadWrapper _commitThreadWrapper;
@Override
protected void setupStore(File storePath, String name) throws DatabaseException, AMQStoreException
{
super.setupStore(storePath, name);
- startCommitThread();
+ _commitThreadWrapper = new CommitThreadWrapper("Commit-Thread-" + name, getEnvironment());
+ _commitThreadWrapper.startCommitThread();
}
protected Environment createEnvironment(File environmentPath) throws DatabaseException
{
LOGGER.info("BDB message store using environment path " + environmentPath.getAbsolutePath());
- EnvironmentConfig envConfig = new EnvironmentConfig();
- // This is what allows the creation of the store if it does not already exist.
- envConfig.setAllowCreate(true);
- envConfig.setTransactional(true);
-
- Properties props = System.getProperties();
-
- for(String propName : props.stringPropertyNames())
- {
- if(propName.startsWith("qpid.bdb.envconfig.je."))
- {
- envConfig.setConfigParam(propName.substring(19), props.getProperty(propName));
- }
- }
-
- envConfig.setConfigParam("je.lock.nLockTables", "7");
-
- // Added to help diagnosis of Deadlock issue
- // http://www.oracle.com/technology/products/berkeley-db/faq/je_faq.html#23
- if (Boolean.getBoolean("qpid.bdb.lock.debug"))
- {
- envConfig.setConfigParam("je.txn.deadlockStackTrace", "true");
- envConfig.setConfigParam("je.txn.dumpLocks", "true");
- }
-
- // Set transaction mode
- _transactionConfig.setReadCommitted(true);
+ EnvironmentConfig envConfig = createEnvironmentConfig();
- //This prevents background threads running which will potentially update the store.
- envConfig.setReadOnly(false);
try
{
return new Environment(environmentPath, envConfig);
@@ -118,12 +77,10 @@ public class BDBMessageStore extends AbstractBDBMessageStore
}
}
-
-
@Override
protected void closeInternal() throws Exception
{
- stopCommitThread();
+ _commitThreadWrapper.stopCommitThread();
super.closeInternal();
}
@@ -133,204 +90,6 @@ public class BDBMessageStore extends AbstractBDBMessageStore
{
tx.commitNoSync();
- BDBCommitFuture commitFuture = new BDBCommitFuture(_commitThread, tx, syncCommit);
- commitFuture.commit();
-
- return commitFuture;
- }
-
- private void startCommitThread()
- {
- _commitThread.start();
- }
-
- private void stopCommitThread() throws InterruptedException
- {
- _commitThread.close();
- _commitThread.join();
- }
-
- private static final class BDBCommitFuture implements StoreFuture
- {
- private final CommitThread _commitThread;
- private final com.sleepycat.je.Transaction _tx;
- private DatabaseException _databaseException;
- private boolean _complete;
- private boolean _syncCommit;
-
- public BDBCommitFuture(CommitThread commitThread, com.sleepycat.je.Transaction tx, boolean syncCommit)
- {
- _commitThread = commitThread;
- _tx = tx;
- _syncCommit = syncCommit;
- }
-
- public synchronized void complete()
- {
- if (LOGGER.isDebugEnabled())
- {
- LOGGER.debug("public synchronized void complete(): called (Transaction = " + _tx + ")");
- }
- _complete = true;
-
- notifyAll();
- }
-
- public synchronized void abort(DatabaseException databaseException)
- {
- _complete = true;
- _databaseException = databaseException;
-
- notifyAll();
- }
-
- public void commit() throws DatabaseException
- {
- _commitThread.addJob(this, _syncCommit);
-
- if(!_syncCommit)
- {
- LOGGER.debug("CommitAsync was requested, returning immediately.");
- return;
- }
-
- waitForCompletion();
-
- if (_databaseException != null)
- {
- throw _databaseException;
- }
-
- }
-
- public synchronized boolean isComplete()
- {
- return _complete;
- }
-
- public synchronized void waitForCompletion()
- {
- while (!isComplete())
- {
- _commitThread.explicitNotify();
- try
- {
- wait(250);
- }
- catch (InterruptedException e)
- {
- //TODO Should we ignore, or throw a 'StoreException'?
- throw new RuntimeException(e);
- }
- }
- }
- }
-
- /**
- * Implements a thread which batches and commits a queue of {@link BDBCommitFuture} operations. The commit operations
- * themselves are responsible for adding themselves to the queue and waiting for the commit to happen before
- * continuing, but it is the responsibility of this thread to tell the commit operations when they have been
- * completed by calling back on their {@link BDBCommitFuture#complete()} and {@link BDBCommitFuture#abort} methods.
- *
- * <p/><table id="crc"><caption>CRC Card</caption> <tr><th> Responsibilities <th> Collaborations </table>
- */
- private class CommitThread extends Thread
- {
- private final AtomicBoolean _stopped = new AtomicBoolean(false);
- private final Queue<BDBCommitFuture> _jobQueue = new ConcurrentLinkedQueue<BDBCommitFuture>();
- private final CheckpointConfig _config = new CheckpointConfig();
- private final Object _lock = new Object();
-
- public CommitThread(String name)
- {
- super(name);
- _config.setForce(true);
-
- }
-
- public void explicitNotify()
- {
- synchronized (_lock)
- {
- _lock.notify();
- }
- }
-
- public void run()
- {
- while (!_stopped.get())
- {
- synchronized (_lock)
- {
- while (!_stopped.get() && !hasJobs())
- {
- try
- {
- // RHM-7 Periodically wake up and check, just in case we
- // missed a notification. Don't want to lock the broker hard.
- _lock.wait(1000);
- }
- catch (InterruptedException e)
- {
- }
- }
- }
- processJobs();
- }
- }
-
- private void processJobs()
- {
- int size = _jobQueue.size();
-
- try
- {
- getEnvironment().flushLog(true);
-
- for(int i = 0; i < size; i++)
- {
- BDBCommitFuture commit = _jobQueue.poll();
- commit.complete();
- }
-
- }
- catch (DatabaseException e)
- {
- for(int i = 0; i < size; i++)
- {
- BDBCommitFuture commit = _jobQueue.poll();
- commit.abort(e);
- }
- }
-
- }
-
- private boolean hasJobs()
- {
- return !_jobQueue.isEmpty();
- }
-
- public void addJob(BDBCommitFuture commit, final boolean sync)
- {
-
- _jobQueue.add(commit);
- if(sync)
- {
- synchronized (_lock)
- {
- _lock.notifyAll();
- }
- }
- }
-
- public void close()
- {
- synchronized (_lock)
- {
- _stopped.set(true);
- _lock.notifyAll();
- }
- }
+ return _commitThreadWrapper.commit(tx, syncCommit);
}
-
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java
new file mode 100644
index 0000000000..5b0abbec93
--- /dev/null
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java
@@ -0,0 +1,227 @@
+package org.apache.qpid.server.store.berkeleydb;
+
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.server.store.StoreFuture;
+
+import com.sleepycat.je.CheckpointConfig;
+import com.sleepycat.je.DatabaseException;
+import com.sleepycat.je.Environment;
+import com.sleepycat.je.Transaction;
+
+public class CommitThreadWrapper
+{
+ private final CommitThread _commitThread;
+
+ public CommitThreadWrapper(String name, Environment env)
+ {
+ _commitThread = new CommitThread(name, env);
+ }
+
+ public void startCommitThread()
+ {
+ _commitThread.start();
+ }
+
+ public void stopCommitThread() throws InterruptedException
+ {
+ _commitThread.close();
+ _commitThread.join();
+ }
+
+ public StoreFuture commit(Transaction tx, boolean syncCommit)
+ {
+ BDBCommitFuture commitFuture = new BDBCommitFuture(_commitThread, tx, syncCommit);
+ commitFuture.commit();
+ return commitFuture;
+ }
+
+ private static final class BDBCommitFuture implements StoreFuture
+ {
+ private static final Logger LOGGER = Logger.getLogger(BDBCommitFuture.class);
+
+ private final CommitThread _commitThread;
+ private final Transaction _tx;
+ private DatabaseException _databaseException;
+ private boolean _complete;
+ private boolean _syncCommit;
+
+ public BDBCommitFuture(CommitThread commitThread, Transaction tx, boolean syncCommit)
+ {
+ _commitThread = commitThread;
+ _tx = tx;
+ _syncCommit = syncCommit;
+ }
+
+ public synchronized void complete()
+ {
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("public synchronized void complete(): called (Transaction = " + _tx + ")");
+ }
+ _complete = true;
+
+ notifyAll();
+ }
+
+ public synchronized void abort(DatabaseException databaseException)
+ {
+ _complete = true;
+ _databaseException = databaseException;
+
+ notifyAll();
+ }
+
+ public void commit() throws DatabaseException
+ {
+ _commitThread.addJob(this, _syncCommit);
+
+ if(!_syncCommit)
+ {
+ LOGGER.debug("CommitAsync was requested, returning immediately.");
+ return;
+ }
+
+ waitForCompletion();
+
+ if (_databaseException != null)
+ {
+ throw _databaseException;
+ }
+
+ }
+
+ public synchronized boolean isComplete()
+ {
+ return _complete;
+ }
+
+ public synchronized void waitForCompletion()
+ {
+ while (!isComplete())
+ {
+ _commitThread.explicitNotify();
+ try
+ {
+ wait(250);
+ }
+ catch (InterruptedException e)
+ {
+ //TODO Should we ignore, or throw a 'StoreException'?
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ }
+
+ /**
+ * Implements a thread which batches and commits a queue of {@link BDBCommitFuture} operations. The commit operations
+ * themselves are responsible for adding themselves to the queue and waiting for the commit to happen before
+ * continuing, but it is the responsibility of this thread to tell the commit operations when they have been
+ * completed by calling back on their {@link BDBCommitFuture#complete()} and {@link BDBCommitFuture#abort} methods.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption> <tr><th> Responsibilities <th> Collaborations </table>
+ */
+ private static class CommitThread extends Thread
+ {
+ private final AtomicBoolean _stopped = new AtomicBoolean(false);
+ private final Queue<BDBCommitFuture> _jobQueue = new ConcurrentLinkedQueue<BDBCommitFuture>();
+ private final CheckpointConfig _config = new CheckpointConfig();
+ private final Object _lock = new Object();
+ private Environment _environment;
+
+ public CommitThread(String name, Environment env)
+ {
+ super(name);
+ _config.setForce(true);
+ _environment = env;
+ }
+
+ public void explicitNotify()
+ {
+ synchronized (_lock)
+ {
+ _lock.notify();
+ }
+ }
+
+ public void run()
+ {
+ while (!_stopped.get())
+ {
+ synchronized (_lock)
+ {
+ while (!_stopped.get() && !hasJobs())
+ {
+ try
+ {
+ // RHM-7 Periodically wake up and check, just in case we
+ // missed a notification. Don't want to lock the broker hard.
+ _lock.wait(1000);
+ }
+ catch (InterruptedException e)
+ {
+ }
+ }
+ }
+ processJobs();
+ }
+ }
+
+ private void processJobs()
+ {
+ int size = _jobQueue.size();
+
+ try
+ {
+ _environment.flushLog(true);
+
+ for(int i = 0; i < size; i++)
+ {
+ BDBCommitFuture commit = _jobQueue.poll();
+ commit.complete();
+ }
+
+ }
+ catch (DatabaseException e)
+ {
+ for(int i = 0; i < size; i++)
+ {
+ BDBCommitFuture commit = _jobQueue.poll();
+ commit.abort(e);
+ }
+ }
+
+ }
+
+ private boolean hasJobs()
+ {
+ return !_jobQueue.isEmpty();
+ }
+
+ public void addJob(BDBCommitFuture commit, final boolean sync)
+ {
+
+ _jobQueue.add(commit);
+ if(sync)
+ {
+ synchronized (_lock)
+ {
+ _lock.notifyAll();
+ }
+ }
+ }
+
+ public void close()
+ {
+ synchronized (_lock)
+ {
+ _stopped.set(true);
+ _lock.notifyAll();
+ }
+ }
+ }
+}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ManagedBDBHAMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ManagedBDBHAMessageStore.java
new file mode 100644
index 0000000000..bfc7bbf128
--- /dev/null
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ManagedBDBHAMessageStore.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import java.io.IOException;
+
+import javax.management.JMException;
+import javax.management.openmbean.TabularData;
+
+import org.apache.qpid.management.common.mbeans.annotations.MBeanAttribute;
+import org.apache.qpid.management.common.mbeans.annotations.MBeanOperation;
+
+public interface ManagedBDBHAMessageStore
+{
+ public static final String TYPE = "BDBHAMessageStore";
+
+ public static final String ATTR_GROUP_NAME = "GroupName";
+ public static final String ATTR_NODE_NAME = "NodeName";
+ public static final String ATTR_NODE_HOST_PORT = "NodeHostPort";
+ public static final String ATTR_HELPER_HOST_PORT = "HelperHostPort";
+ public static final String ATTR_REPLICATION_POLICY = "ReplicationPolicy";
+ public static final String ATTR_NODE_STATE = "NodeState";
+ public static final String ATTR_DESIGNATED_PRIMARY = "DesignatedPrimary";
+
+ @MBeanAttribute(name=ATTR_GROUP_NAME, description="Name identifying the group")
+ String getGroupName() throws IOException;
+
+ @MBeanAttribute(name=ATTR_NODE_NAME, description="Unique name identifying the node within the group")
+ String getNodeName() throws IOException;
+
+ @MBeanAttribute(name=ATTR_NODE_HOST_PORT, description="Host/port used to replicate data between this node and others in the group")
+ String getNodeHostPort() throws IOException;
+
+ @MBeanAttribute(name=ATTR_NODE_STATE, description="Current state of this node")
+ String getNodeState() throws IOException;
+
+ @MBeanAttribute(name=ATTR_HELPER_HOST_PORT, description="Host/port used to allow a new node to discover other group members")
+ String getHelperHostPort() throws IOException;
+
+ @MBeanAttribute(name=ATTR_REPLICATION_POLICY, description="Replication policy")
+ String getReplicationPolicy() throws IOException;
+
+ @MBeanAttribute(name=ATTR_DESIGNATED_PRIMARY, description="Designated primary flag. Applicable to the two node case.")
+ boolean getDesignatedPrimary() throws IOException;
+
+ @MBeanOperation(name="getAllNodesInGroup", description="Get all nodes within the group, regardless of whether currently attached or not")
+ TabularData getAllNodesInGroup() throws IOException, JMException;
+
+ @MBeanOperation(name="removeNodeFromGroup", description="Remove an existing node from the group")
+ void removeNodeFromGroup(String nodeName) throws JMException;
+
+ @MBeanOperation(name="setDesignatedPrimary", description="Set/unset this node as the designated primary for the group. Applicable to the two node case.")
+ void setDesignatedPrimary(boolean primary) throws JMException;
+
+ @MBeanOperation(name="updateAddress", description="Update the address of another node. The node must be in a STOPPED state.")
+ void updateAddress(String nodeName, String newHostName, int newPort) throws JMException;
+}
+
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java
index e71e39cbb8..f1ab012efc 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java
@@ -24,7 +24,7 @@ import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import org.apache.qpid.AMQStoreException;
-import org.apache.qpid.server.store.berkeleydb.BDBMessageStore;
+import org.apache.qpid.server.store.berkeleydb.AbstractBDBMessageStore;
import com.sleepycat.bind.tuple.IntegerBinding;
import com.sleepycat.bind.tuple.LongBinding;
@@ -63,7 +63,7 @@ public class Upgrader
if(versionDb.count() == 0L)
{
- int sourceVersion = isEmpty ? BDBMessageStore.VERSION: identifyOldStoreVersion();
+ int sourceVersion = isEmpty ? AbstractBDBMessageStore.VERSION: identifyOldStoreVersion();
DatabaseEntry key = new DatabaseEntry();
IntegerBinding.intToEntry(sourceVersion, key);
DatabaseEntry value = new DatabaseEntry();
@@ -87,7 +87,7 @@ public class Upgrader
int getSourceVersion(Database versionDb)
{
- int version = BDBMessageStore.VERSION + 1;
+ int version = AbstractBDBMessageStore.VERSION + 1;
OperationStatus result;
do
@@ -106,7 +106,7 @@ public class Upgrader
void performUpgradeFromVersion(int sourceVersion, Database versionDb)
throws AMQStoreException
{
- while(sourceVersion != BDBMessageStore.VERSION)
+ while(sourceVersion != AbstractBDBMessageStore.VERSION)
{
upgrade(sourceVersion, ++sourceVersion);
DatabaseEntry key = new DatabaseEntry();
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreManagerMBeanTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreManagerMBeanTest.java
new file mode 100644
index 0000000000..00f99b7097
--- /dev/null
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreManagerMBeanTest.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.management.JMException;
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.CompositeType;
+import javax.management.openmbean.TabularData;
+
+import junit.framework.TestCase;
+
+import org.apache.qpid.AMQStoreException;
+import org.apache.qpid.server.logging.SystemOutMessageLogger;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.actors.TestLogActor;
+
+public class BDBHAMessageStoreManagerMBeanTest extends TestCase
+{
+ private static final String TEST_GROUP_NAME = "testGroupName";
+ private static final String TEST_NODE_NAME = "testNodeName";
+ private static final String TEST_NODE_HOST_PORT = "host:1234";
+ private static final String TEST_HELPER_HOST_PORT = "host:5678";
+ private static final String TEST_REPLICATION_POLICY = "sync,sync,all";
+ private static final String TEST_NODE_STATE = "MASTER";
+ private static final String TEST_STORE_NAME = "testStoreName";
+ private static final boolean TEST_DESIGNATED_PRIMARY_FLAG = false;
+
+ private BDBHAMessageStore _store;
+ private BDBHAMessageStoreManagerMBean _mBean;
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ CurrentActor.set(new TestLogActor(new SystemOutMessageLogger()));
+ _store = mock(BDBHAMessageStore.class);
+ _mBean = new BDBHAMessageStoreManagerMBean(_store);
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+ CurrentActor.remove();
+ }
+
+ public void testObjectName() throws Exception
+ {
+ when(_store.getName()).thenReturn(TEST_STORE_NAME);
+
+ String expectedObjectName = "org.apache.qpid:type=BDBHAMessageStore,name=" + TEST_STORE_NAME;
+ assertEquals(expectedObjectName, _mBean.getObjectName().toString());
+ }
+
+ public void testGroupName() throws Exception
+ {
+ when(_store.getGroupName()).thenReturn(TEST_GROUP_NAME);
+
+ assertEquals(TEST_GROUP_NAME, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_GROUP_NAME));
+ }
+
+ public void testNodeName() throws Exception
+ {
+ when(_store.getNodeName()).thenReturn(TEST_NODE_NAME);
+
+ assertEquals(TEST_NODE_NAME, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_NODE_NAME));
+ }
+
+ public void testNodeHostPort() throws Exception
+ {
+ when(_store.getNodeHostPort()).thenReturn(TEST_NODE_HOST_PORT);
+
+ assertEquals(TEST_NODE_HOST_PORT, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_NODE_HOST_PORT));
+ }
+
+ public void testHelperHostPort() throws Exception
+ {
+ when(_store.getHelperHostPort()).thenReturn(TEST_HELPER_HOST_PORT);
+
+ assertEquals(TEST_HELPER_HOST_PORT, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_HELPER_HOST_PORT));
+ }
+
+ public void testReplicationPolicy() throws Exception
+ {
+ when(_store.getReplicationPolicy()).thenReturn(TEST_REPLICATION_POLICY);
+
+ assertEquals(TEST_REPLICATION_POLICY, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_REPLICATION_POLICY));
+ }
+
+ public void testNodeState() throws Exception
+ {
+ when(_store.getNodeState()).thenReturn(TEST_NODE_STATE);
+
+ assertEquals(TEST_NODE_STATE, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_NODE_STATE));
+ }
+
+ public void testDesignatedPrimaryFlag() throws Exception
+ {
+ when(_store.isDesignatedPrimary()).thenReturn(TEST_DESIGNATED_PRIMARY_FLAG);
+
+ assertEquals(TEST_DESIGNATED_PRIMARY_FLAG, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_DESIGNATED_PRIMARY));
+ }
+
+ public void testGroupMembersForGroupWithOneNode() throws Exception
+ {
+ List<Map<String, String>> members = Collections.singletonList(createTestNodeResult());
+ when(_store.getGroupMembers()).thenReturn(members);
+
+ final TabularData resultsTable = _mBean.getAllNodesInGroup();
+
+ assertTableHasHeadingsNamed(resultsTable, BDBHAMessageStore.GRP_MEM_COL_NODE_NAME, BDBHAMessageStore.GRP_MEM_COL_NODE_HOST_PORT);
+
+ final int numberOfDataRows = resultsTable.size();
+ assertEquals("Unexpected number of data rows", 1 ,numberOfDataRows);
+ final CompositeData row = (CompositeData) resultsTable.values().iterator().next();
+ assertEquals(TEST_NODE_NAME, row.get(BDBHAMessageStore.GRP_MEM_COL_NODE_NAME));
+ assertEquals(TEST_NODE_HOST_PORT, row.get(BDBHAMessageStore.GRP_MEM_COL_NODE_HOST_PORT));
+ }
+
+ public void testRemoveNodeFromReplicationGroup() throws Exception
+ {
+ _mBean.removeNodeFromGroup(TEST_NODE_NAME);
+
+ verify(_store).removeNodeFromGroup(TEST_NODE_NAME);
+ }
+
+ public void testRemoveNodeFromReplicationGroupWithError() throws Exception
+ {
+ doThrow(new AMQStoreException("mocked exception")).when(_store).removeNodeFromGroup(TEST_NODE_NAME);
+
+ try
+ {
+ _mBean.removeNodeFromGroup(TEST_NODE_NAME);
+ fail("Exception not thrown");
+ }
+ catch (JMException je)
+ {
+ // PASS
+ }
+ }
+
+ public void testSetAsDesignatedPrimary() throws Exception
+ {
+ _mBean.setDesignatedPrimary(true);
+
+ verify(_store).setDesignatedPrimary(true);
+ }
+
+ public void testSetAsDesignatedPrimaryWithError() throws Exception
+ {
+ doThrow(new AMQStoreException("mocked exception")).when(_store).setDesignatedPrimary(true);
+
+ try
+ {
+ _mBean.setDesignatedPrimary(true);
+ fail("Exception not thrown");
+ }
+ catch (JMException je)
+ {
+ // PASS
+ }
+ }
+
+ public void testUpdateAddress() throws Exception
+ {
+ String newHostName = "newHostName";
+ int newPort = 1967;
+
+ _mBean.updateAddress(TEST_NODE_NAME, newHostName, newPort);
+
+ verify(_store).updateAddress(TEST_NODE_NAME, newHostName, newPort);
+ }
+
+ private void assertTableHasHeadingsNamed(final TabularData resultsTable, String... headingNames)
+ {
+ CompositeType headingsRow = resultsTable.getTabularType().getRowType();
+ for (final String headingName : headingNames)
+ {
+ assertTrue("Table should have column with heading " + headingName, headingsRow.containsKey(headingName));
+ }
+ }
+
+ private Map<String, String> createTestNodeResult()
+ {
+ Map<String, String> items = new HashMap<String, String>();
+ items.put(BDBHAMessageStore.GRP_MEM_COL_NODE_NAME, TEST_NODE_NAME);
+ items.put(BDBHAMessageStore.GRP_MEM_COL_NODE_HOST_PORT, TEST_NODE_HOST_PORT);
+ return items;
+ }
+}
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreTest.java
new file mode 100644
index 0000000000..6f851bd94e
--- /dev/null
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreTest.java
@@ -0,0 +1,144 @@
+package org.apache.qpid.server.store.berkeleydb;
+
+import java.io.File;
+import java.net.InetAddress;
+
+import org.apache.commons.configuration.XMLConfiguration;
+import org.apache.qpid.server.configuration.ServerConfiguration;
+import org.apache.qpid.server.logging.SystemOutMessageLogger;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.actors.TestLogActor;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.util.TestApplicationRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.test.utils.QpidTestCase;
+import org.apache.qpid.util.FileUtils;
+
+import com.sleepycat.je.Environment;
+import com.sleepycat.je.EnvironmentConfig;
+import com.sleepycat.je.rep.ReplicatedEnvironment;
+import com.sleepycat.je.rep.ReplicationConfig;
+
+public class BDBHAMessageStoreTest extends QpidTestCase
+{
+ private static final String TEST_LOG_FILE_MAX = "1000000";
+ private static final String TEST_ELECTION_RETRIES = "1000";
+ private static final String TEST_NUMBER_OF_THREADS = "10";
+ private static final String TEST_ENV_CONSISTENCY_TIMEOUT = "9999999";
+ private String _groupName;
+ private String _workDir;
+ private int _masterPort;
+ private String _host;
+ private XMLConfiguration _configXml;
+
+ public void setUp() throws Exception
+ {
+ super.setUp();
+
+ _workDir = TMP_FOLDER + File.separator + getName();
+ _host = InetAddress.getByName("localhost").getHostAddress();
+ _groupName = "group" + getName();
+ _masterPort = -1;
+
+ FileUtils.delete(new File(_workDir), true);
+ _configXml = new XMLConfiguration();
+ }
+
+ public void tearDown() throws Exception
+ {
+ FileUtils.delete(new File(_workDir), true);
+ super.tearDown();
+ }
+
+ public void testSetSystemConfiguration() throws Exception
+ {
+ // create virtual host configuration, registry and host instance
+ addVirtualHostConfiguration();
+ TestApplicationRegistry registry = initialize();
+ try
+ {
+ VirtualHost virtualhost = registry.getVirtualHostRegistry().getVirtualHost("test" + _masterPort);
+ BDBHAMessageStore store = (BDBHAMessageStore) virtualhost.getMessageStore();
+
+ // test whether JVM system settings were applied
+ Environment env = store.getEnvironment();
+ assertEquals("Unexpected number of cleaner threads", TEST_NUMBER_OF_THREADS, env.getConfig().getConfigParam(EnvironmentConfig.CLEANER_THREADS));
+ assertEquals("Unexpected log file max", TEST_LOG_FILE_MAX, env.getConfig().getConfigParam(EnvironmentConfig.LOG_FILE_MAX));
+
+ ReplicatedEnvironment repEnv = store.getReplicatedEnvironment();
+ assertEquals("Unexpected number of elections primary retries", TEST_ELECTION_RETRIES,
+ repEnv.getConfig().getConfigParam(ReplicationConfig.ELECTIONS_PRIMARY_RETRIES));
+ assertEquals("Unexpected number of elections primary retries", TEST_ENV_CONSISTENCY_TIMEOUT,
+ repEnv.getConfig().getConfigParam(ReplicationConfig.ENV_CONSISTENCY_TIMEOUT));
+ }
+ finally
+ {
+ ApplicationRegistry.remove();
+ }
+ }
+
+ private void addVirtualHostConfiguration() throws Exception
+ {
+ int port = findFreePort();
+ if (_masterPort == -1)
+ {
+ _masterPort = port;
+ }
+ String nodeName = getNodeNameForNodeAt(port);
+
+ String vhostName = "test" + port;
+ String vhostPrefix = "virtualhosts.virtualhost." + vhostName;
+
+ _configXml.addProperty("virtualhosts.virtualhost.name", vhostName);
+ _configXml.addProperty(vhostPrefix + ".store.class", BDBHAMessageStore.class.getName());
+ _configXml.addProperty(vhostPrefix + ".store.environment-path", _workDir + File.separator
+ + port);
+ _configXml.addProperty(vhostPrefix + ".store.highAvailability.groupName", _groupName);
+ _configXml.addProperty(vhostPrefix + ".store.highAvailability.nodeName", nodeName);
+ _configXml.addProperty(vhostPrefix + ".store.highAvailability.nodeHostPort",
+ getNodeHostPortForNodeAt(port));
+ _configXml.addProperty(vhostPrefix + ".store.highAvailability.helperHostPort",
+ getHelperHostPort());
+
+ _configXml.addProperty(vhostPrefix + ".store.envConfig(-1).name", EnvironmentConfig.CLEANER_THREADS);
+ _configXml.addProperty(vhostPrefix + ".store.envConfig.value", TEST_NUMBER_OF_THREADS);
+
+ _configXml.addProperty(vhostPrefix + ".store.envConfig(-1).name", EnvironmentConfig.LOG_FILE_MAX);
+ _configXml.addProperty(vhostPrefix + ".store.envConfig.value", TEST_LOG_FILE_MAX);
+
+ _configXml.addProperty(vhostPrefix + ".store.repConfig(-1).name", ReplicationConfig.ELECTIONS_PRIMARY_RETRIES);
+ _configXml.addProperty(vhostPrefix + ".store.repConfig.value", TEST_ELECTION_RETRIES);
+
+ _configXml.addProperty(vhostPrefix + ".store.repConfig(-1).name", ReplicationConfig.ENV_CONSISTENCY_TIMEOUT);
+ _configXml.addProperty(vhostPrefix + ".store.repConfig.value", TEST_ENV_CONSISTENCY_TIMEOUT);
+ }
+
+ private String getNodeNameForNodeAt(final int bdbPort)
+ {
+ return "node" + getName() + bdbPort;
+ }
+
+ private String getNodeHostPortForNodeAt(final int bdbPort)
+ {
+ return _host + ":" + bdbPort;
+ }
+
+ private String getHelperHostPort()
+ {
+ if (_masterPort == -1)
+ {
+ throw new IllegalStateException("Helper port not yet assigned.");
+ }
+ return _host + ":" + _masterPort;
+ }
+
+ private TestApplicationRegistry initialize() throws Exception
+ {
+ CurrentActor.set(new TestLogActor(new SystemOutMessageLogger()));
+ ServerConfiguration configuration = new ServerConfiguration(_configXml);
+ TestApplicationRegistry registry = new TestApplicationRegistry(configuration);
+ ApplicationRegistry.initialise(registry);
+ registry.getVirtualHostRegistry().setDefaultVirtualHostName("test" + _masterPort);
+ return registry;
+ }
+}
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
index a318187f13..591bc27d1e 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
@@ -70,7 +70,7 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto
{
MessageStore store = getVirtualHost().getMessageStore();
- BDBMessageStore bdbStore = assertBDBStore(store);
+ AbstractBDBMessageStore bdbStore = assertBDBStore(store);
// Create content ByteBuffers.
// Split the content into 2 chunks for the 0-8 message, as per broker behaviour.
@@ -220,11 +220,11 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto
* Use this method instead of reloading the virtual host like other tests in order
* to avoid the recovery handler deleting the message for not being on a queue.
*/
- private BDBMessageStore reloadStore(BDBMessageStore messageStore) throws Exception
+ private AbstractBDBMessageStore reloadStore(AbstractBDBMessageStore messageStore) throws Exception
{
messageStore.close();
- BDBMessageStore newStore = new BDBMessageStore();
+ AbstractBDBMessageStore newStore = new BDBMessageStore();
newStore.configure("", _config.subset("store"));
newStore.startWithNoRecover();
@@ -282,7 +282,7 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto
public void testGetContentWithOffset() throws Exception
{
MessageStore store = getVirtualHost().getMessageStore();
- BDBMessageStore bdbStore = assertBDBStore(store);
+ AbstractBDBMessageStore bdbStore = assertBDBStore(store);
StoredMessage<MessageMetaData> storedMessage_0_8 = createAndStoreSingleChunkMessage_0_8(store);
long messageid_0_8 = storedMessage_0_8.getMessageNumber();
@@ -342,7 +342,7 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto
public void testMessageCreationAndRemoval() throws Exception
{
MessageStore store = getVirtualHost().getMessageStore();
- BDBMessageStore bdbStore = assertBDBStore(store);
+ AbstractBDBMessageStore bdbStore = assertBDBStore(store);
StoredMessage<MessageMetaData> storedMessage_0_8 = createAndStoreSingleChunkMessage_0_8(store);
long messageid_0_8 = storedMessage_0_8.getMessageNumber();
@@ -367,12 +367,12 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto
assertEquals("Retrieved content when none was expected",
0, bdbStore.getContent(messageid_0_8, 0, dst));
}
- private BDBMessageStore assertBDBStore(MessageStore store)
+ private AbstractBDBMessageStore assertBDBStore(MessageStore store)
{
assertEquals("Test requires an instance of BDBMessageStore to proceed", BDBMessageStore.class, store.getClass());
- return (BDBMessageStore) store;
+ return (AbstractBDBMessageStore) store;
}
private StoredMessage<MessageMetaData> createAndStoreSingleChunkMessage_0_8(MessageStore store)
@@ -405,7 +405,7 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto
{
MessageStore log = getVirtualHost().getMessageStore();
- BDBMessageStore bdbStore = assertBDBStore(log);
+ AbstractBDBMessageStore bdbStore = assertBDBStore(log);
final UUID mockQueueId = UUIDGenerator.generateUUID();
TransactionLogResource mockQueue = new TransactionLogResource()
@@ -443,7 +443,7 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto
{
MessageStore log = getVirtualHost().getMessageStore();
- BDBMessageStore bdbStore = assertBDBStore(log);
+ AbstractBDBMessageStore bdbStore = assertBDBStore(log);
final UUID mockQueueId = UUIDGenerator.generateUUID();
TransactionLogResource mockQueue = new TransactionLogResource()
@@ -484,7 +484,7 @@ public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageSto
{
MessageStore log = getVirtualHost().getMessageStore();
- BDBMessageStore bdbStore = assertBDBStore(log);
+ AbstractBDBMessageStore bdbStore = assertBDBStore(log);
final UUID mockQueueId = UUIDGenerator.generateUUID();
TransactionLogResource mockQueue = new TransactionLogResource()
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java
new file mode 100644
index 0000000000..afe0435901
--- /dev/null
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import java.io.File;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.Session;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.jms.ConnectionListener;
+import org.apache.qpid.jms.ConnectionURL;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+/**
+ * The HA black box tests test the BDB cluster as a opaque unit. Client connects to
+ * the cluster via a failover url
+ *
+ * @see HAClusterWhiteboxTest
+ */
+public class HAClusterBlackboxTest extends QpidBrokerTestCase
+{
+ protected static final Logger LOGGER = Logger.getLogger(HAClusterBlackboxTest.class);
+
+ private static final String VIRTUAL_HOST = "test";
+ private static final int NUMBER_OF_NODES = 3;
+
+ private final HATestClusterCreator _clusterCreator = new HATestClusterCreator(this, VIRTUAL_HOST, NUMBER_OF_NODES);
+
+ private FailoverAwaitingListener _failoverAwaitingListener;
+ private ConnectionURL _brokerFailoverUrl;
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ _brokerType = BrokerType.SPAWNED;
+
+ assertTrue(isJavaBroker());
+ assertTrue(isBrokerStorePersistent());
+
+ setBrokerEnvironment("QPID_OPTS", "-Djava.util.logging.config.file=" + System.getProperty(QPID_HOME)
+ + File.separator + "etc" + File.separator + "log.properties");
+
+ _clusterCreator.configureClusterNodes();
+
+ _brokerFailoverUrl = _clusterCreator.getConnectionUrlForAllClusterNodes();
+
+ _clusterCreator.startCluster();
+ _failoverAwaitingListener = new FailoverAwaitingListener();
+
+ super.setUp();
+ }
+
+ @Override
+ public void startBroker() throws Exception
+ {
+ // Don't start default broker provided by QBTC.
+ }
+
+ public void testLossOfActiveNodeCausesClientToFailover() throws Exception
+ {
+ final Connection connection = getConnection(_brokerFailoverUrl);
+
+ ((AMQConnection)connection).setConnectionListener(_failoverAwaitingListener);
+
+ final int activeBrokerPort = _clusterCreator.getBrokerPortNumberFromConnection(connection);
+ LOGGER.info("Active connection port " + activeBrokerPort);
+
+ _clusterCreator.stopNode(activeBrokerPort);
+ LOGGER.info("Node is stopped");
+ _failoverAwaitingListener.assertFailoverOccurs(20000);
+ LOGGER.info("Listener has finished");
+ // any op to ensure connection remains
+ connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ }
+
+ public void testLossOfInactiveNodeDoesNotCauseClientToFailover() throws Exception
+ {
+ LOGGER.info("Connecting to " + _brokerFailoverUrl);
+ final Connection connection = getConnection(_brokerFailoverUrl);
+
+ ((AMQConnection)connection).setConnectionListener(_failoverAwaitingListener);
+ final int activeBrokerPort = _clusterCreator.getBrokerPortNumberFromConnection(connection);
+ LOGGER.info("Active connection port " + activeBrokerPort);
+ final int inactiveBrokerPort = _clusterCreator.getPortNumberOfAnInactiveBroker(connection);
+
+ LOGGER.info("Stopping inactive broker on port " + inactiveBrokerPort);
+
+ _clusterCreator.stopNode(inactiveBrokerPort);
+
+ _failoverAwaitingListener.assertFailoverDoesNotOccur(2000);
+
+ // any op to ensure connection remains
+ connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ }
+
+ private final class FailoverAwaitingListener implements ConnectionListener
+ {
+ private final CountDownLatch _failoverLatch = new CountDownLatch(1);
+
+ @Override
+ public boolean preResubscribe()
+ {
+ return true;
+ }
+
+ @Override
+ public boolean preFailover(boolean redirect)
+ {
+ return true;
+ }
+
+ public void assertFailoverOccurs(long delay) throws InterruptedException
+ {
+ _failoverLatch.await(delay, TimeUnit.MILLISECONDS);
+ assertEquals("Failover did not occur", 0, _failoverLatch.getCount());
+ }
+
+ public void assertFailoverDoesNotOccur(long delay) throws InterruptedException
+ {
+ _failoverLatch.await(delay, TimeUnit.MILLISECONDS);
+ assertEquals("Failover occurred unexpectedly", 1L, _failoverLatch.getCount());
+ }
+
+
+ @Override
+ public void failoverComplete()
+ {
+ _failoverLatch.countDown();
+ }
+
+ @Override
+ public void bytesSent(long count)
+ {
+ }
+
+ @Override
+ public void bytesReceived(long count)
+ {
+ }
+ }
+
+}
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java
new file mode 100644
index 0000000000..1afa45fd5a
--- /dev/null
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import static com.sleepycat.je.rep.ReplicatedEnvironment.State.DETACHED;
+import static com.sleepycat.je.rep.ReplicatedEnvironment.State.MASTER;
+import static com.sleepycat.je.rep.ReplicatedEnvironment.State.REPLICA;
+import static com.sleepycat.je.rep.ReplicatedEnvironment.State.UNKNOWN;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+
+import javax.jms.Connection;
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.TabularData;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.jms.ConnectionURL;
+import org.apache.qpid.test.utils.JMXTestUtils;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+import com.sleepycat.je.EnvironmentFailureException;
+
+/**
+ * System test verifying the ability to control a cluster via the Management API.
+ *
+ * @see HAClusterBlackboxTest
+ */
+public class HAClusterManagementTest extends QpidBrokerTestCase
+{
+ protected static final Logger LOGGER = Logger.getLogger(HAClusterManagementTest.class);
+
+ private static final Set<String> NON_MASTER_STATES = new HashSet<String>(Arrays.asList(REPLICA.toString(), DETACHED.toString(), UNKNOWN.toString()));;
+ private static final String VIRTUAL_HOST = "test";
+
+ private static final String MANAGED_OBJECT_QUERY = "org.apache.qpid:type=BDBHAMessageStore,name=" + VIRTUAL_HOST;
+ private static final int NUMBER_OF_NODES = 4;
+
+ private final HATestClusterCreator _clusterCreator = new HATestClusterCreator(this, VIRTUAL_HOST, NUMBER_OF_NODES);
+ private final JMXTestUtils _jmxUtils = new JMXTestUtils(this);
+
+ private ConnectionURL _brokerFailoverUrl;
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ _brokerType = BrokerType.SPAWNED;
+
+ assertTrue(isJavaBroker());
+ assertTrue(isBrokerStorePersistent());
+ _jmxUtils.setUp();
+
+ _clusterCreator.configureClusterNodes();
+ _brokerFailoverUrl = _clusterCreator.getConnectionUrlForAllClusterNodes();
+ _clusterCreator.startCluster();
+
+ super.setUp();
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ try
+ {
+ _jmxUtils.close();
+ }
+ finally
+ {
+ super.tearDown();
+ }
+ }
+
+ @Override
+ public void startBroker() throws Exception
+ {
+ // Don't start default broker provided by QBTC.
+ }
+
+ public void testReadonlyMBeanAttributes() throws Exception
+ {
+ final int brokerPortNumber = getBrokerPortNumbers().iterator().next();
+ final int bdbPortNumber = _clusterCreator.getBdbPortForBrokerPort(brokerPortNumber);
+
+ ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(brokerPortNumber);
+ assertEquals("Unexpected store group name", _clusterCreator.getGroupName(), storeBean.getGroupName());
+ assertEquals("Unexpected store node name", _clusterCreator.getNodeNameForNodeAt(bdbPortNumber), storeBean.getNodeName());
+ assertEquals("Unexpected store node host port",_clusterCreator.getNodeHostPortForNodeAt(bdbPortNumber), storeBean.getNodeHostPort());
+ assertEquals("Unexpected store helper host port", _clusterCreator.getHelperHostPort(), storeBean.getHelperHostPort());
+ // As we have chosen an arbitrary broker from the cluster, we cannot predict its state
+ assertNotNull("Store state must not be null", storeBean.getNodeState());
+ }
+
+ public void testStateOfActiveBrokerIsMaster() throws Exception
+ {
+ final Connection activeConnection = getConnection(_brokerFailoverUrl);
+ final int activeBrokerPortNumber = _clusterCreator.getBrokerPortNumberFromConnection(activeConnection);
+
+ ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(activeBrokerPortNumber);
+ assertEquals("Unexpected store state", MASTER.toString(), storeBean.getNodeState());
+ }
+
+ public void testStateOfNonActiveBrokerIsNotMaster() throws Exception
+ {
+ final Connection activeConnection = getConnection(_brokerFailoverUrl);
+ final int inactiveBrokerPortNumber = _clusterCreator.getPortNumberOfAnInactiveBroker(activeConnection);
+ ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(inactiveBrokerPortNumber);
+ final String nodeState = storeBean.getNodeState();
+ assertTrue("Unexpected store state : " + nodeState, NON_MASTER_STATES.contains(nodeState));
+ }
+
+ public void testGroupMembers() throws Exception
+ {
+ final int brokerPortNumber = getBrokerPortNumbers().iterator().next();
+
+ ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(brokerPortNumber);
+ final TabularData groupMembers = storeBean.getAllNodesInGroup();
+ assertNotNull(groupMembers);
+
+ final int numberOfDataRows = groupMembers.size();
+ assertEquals("Unexpected number of data rows", NUMBER_OF_NODES ,numberOfDataRows);
+
+ for(int bdbPortNumber : _clusterCreator.getBdbPortNumbers())
+ {
+ final String nodeName = _clusterCreator.getNodeNameForNodeAt(bdbPortNumber);
+ final String nodeHostPort = _clusterCreator.getNodeHostPortForNodeAt(bdbPortNumber);
+
+ CompositeData row = groupMembers.get(new Object[] {nodeName});
+ assertNotNull("Table does not contain row for node name " + nodeName, row);
+ assertEquals(nodeHostPort, row.get(BDBHAMessageStore.GRP_MEM_COL_NODE_HOST_PORT));
+ }
+ }
+
+ public void testRemoveNodeFromGroup() throws Exception
+ {
+ final Iterator<Integer> brokerPortNumberIterator = getBrokerPortNumbers().iterator();
+ final int brokerPortNumberToMakeObservation = brokerPortNumberIterator.next();
+ final int brokerPortNumberToBeRemoved = brokerPortNumberIterator.next();
+ final ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(brokerPortNumberToMakeObservation);
+ final int numberOfDataRows = storeBean.getAllNodesInGroup().size();
+ assertEquals("Unexpected number of data rows before test", NUMBER_OF_NODES ,numberOfDataRows);
+
+ final String removedNodeName = _clusterCreator.getNodeNameForNodeAt(_clusterCreator.getBdbPortForBrokerPort(brokerPortNumberToBeRemoved));
+ _clusterCreator.stopNode(brokerPortNumberToBeRemoved);
+ storeBean.removeNodeFromGroup(removedNodeName);
+
+ final int numberOfDataRowsAfterRemoval = storeBean.getAllNodesInGroup().size();
+ assertEquals("Unexpected number of data rows before test", NUMBER_OF_NODES - 1,numberOfDataRowsAfterRemoval);
+ }
+
+ /**
+ * Updates the address of a node.
+ *
+ * If the broker (node) can subsequently start without error then the update was a success, hence no need for an explicit
+ * assert.
+ *
+ * @see #testRestartNodeWithNewPortNumberWithoutFirstCallingUpdateAddressThrowsAnException() for converse case
+ */
+ public void testUpdateAddress() throws Exception
+ {
+ final Iterator<Integer> brokerPortNumberIterator = getBrokerPortNumbers().iterator();
+ final int brokerPortNumberToPerformUpdate = brokerPortNumberIterator.next();
+ final int brokerPortNumberToBeMoved = brokerPortNumberIterator.next();
+ final ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(brokerPortNumberToPerformUpdate);
+
+ _clusterCreator.stopNode(brokerPortNumberToBeMoved);
+
+ final int oldBdbPort = _clusterCreator.getBdbPortForBrokerPort(brokerPortNumberToBeMoved);
+ final int newBdbPort = getNextAvailable(oldBdbPort + 1);
+
+ storeBean.updateAddress(_clusterCreator.getNodeNameForNodeAt(oldBdbPort), _clusterCreator.getIpAddressOfBrokerHost(), newBdbPort);
+
+ _clusterCreator.modifyClusterNodeBdbAddress(brokerPortNumberToBeMoved, newBdbPort);
+
+ _clusterCreator.startNode(brokerPortNumberToBeMoved);
+ }
+
+ /**
+ * @see #testUpdateAddress()
+ */
+ public void testRestartNodeWithNewPortNumberWithoutFirstCallingUpdateAddressThrowsAnException() throws Exception
+ {
+ final Iterator<Integer> brokerPortNumberIterator = getBrokerPortNumbers().iterator();
+ final int brokerPortNumberToBeMoved = brokerPortNumberIterator.next();
+
+ _clusterCreator.stopNode(brokerPortNumberToBeMoved);
+
+ final int oldBdbPort = _clusterCreator.getBdbPortForBrokerPort(brokerPortNumberToBeMoved);
+ final int newBdbPort = getNextAvailable(oldBdbPort + 1);
+
+ // now deliberately don't call updateAddress
+
+ _clusterCreator.modifyClusterNodeBdbAddress(brokerPortNumberToBeMoved, newBdbPort);
+
+ try
+ {
+ _clusterCreator.startNode(brokerPortNumberToBeMoved);
+ fail("Exception not thrown");
+ }
+ catch(RuntimeException rte)
+ {
+ //check cause was BDBs EnvironmentFailureException
+ assertTrue(rte.getMessage().contains(EnvironmentFailureException.class.getName()));
+ // PASS
+ }
+ }
+
+ private ManagedBDBHAMessageStore getStoreBeanForNodeAtBrokerPort(
+ final int activeBrokerPortNumber) throws Exception
+ {
+ _jmxUtils.open(activeBrokerPortNumber);
+
+ return _jmxUtils.getManagedObject(ManagedBDBHAMessageStore.class, MANAGED_OBJECT_QUERY);
+ }
+}
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterTwoNodeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterTwoNodeTest.java
new file mode 100644
index 0000000000..5f995ae25d
--- /dev/null
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterTwoNodeTest.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import java.io.File;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+
+import org.apache.qpid.jms.ConnectionURL;
+import org.apache.qpid.test.utils.JMXTestUtils;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+import com.sleepycat.je.rep.ReplicationConfig;
+
+public class HAClusterTwoNodeTest extends QpidBrokerTestCase
+{
+ private static final long RECEIVE_TIMEOUT = 5000l;
+
+ private static final String VIRTUAL_HOST = "test";
+
+ private static final String MANAGED_OBJECT_QUERY = "org.apache.qpid:type=BDBHAMessageStore,name=" + VIRTUAL_HOST;
+ private static final int NUMBER_OF_NODES = 2;
+
+ private final HATestClusterCreator _clusterCreator = new HATestClusterCreator(this, VIRTUAL_HOST, NUMBER_OF_NODES);
+ private final JMXTestUtils _jmxUtils = new JMXTestUtils(this);
+
+ private ConnectionURL _brokerFailoverUrl;
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ _brokerType = BrokerType.SPAWNED;
+
+ assertTrue(isJavaBroker());
+ assertTrue(isBrokerStorePersistent());
+ _jmxUtils.setUp();
+
+ super.setUp();
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ try
+ {
+ _jmxUtils.close();
+ }
+ finally
+ {
+ super.tearDown();
+ }
+ }
+
+ @Override
+ public void startBroker() throws Exception
+ {
+ // Don't start default broker provided by QBTC.
+ }
+
+ private void startCluster(boolean autoDesignedPrimary) throws Exception
+ {
+ setSystemProperty("java.util.logging.config.file",
+ System.getProperty(QPID_HOME) + File.separator + "etc" + File.separator + "log.properties");
+
+ String vhostPrefix = "virtualhosts.virtualhost." + VIRTUAL_HOST;
+
+ setConfigurationProperty(vhostPrefix + ".store.repConfig(0).name", ReplicationConfig.INSUFFICIENT_REPLICAS_TIMEOUT);
+ setConfigurationProperty(vhostPrefix + ".store.repConfig(0).value", "2 s");
+
+ setConfigurationProperty(vhostPrefix + ".store.repConfig(1).name", ReplicationConfig.ELECTIONS_PRIMARY_RETRIES);
+ setConfigurationProperty(vhostPrefix + ".store.repConfig(1).value", "0");
+
+ _clusterCreator.configureClusterNodes();
+ _clusterCreator.setAutoDesignatedPrimary(autoDesignedPrimary);
+ _brokerFailoverUrl = _clusterCreator.getConnectionUrlForAllClusterNodes();
+ _clusterCreator.startCluster();
+ }
+
+ /**
+ * Tests that a two node cluster, in which the master CAN automatically designate itself primary
+ * (after becoming master) continues to operate after being shut down and restarted.
+ *
+ * The test does not concern itself with which broker becomes master at any given point
+ * (which is likely to swap during the test).
+ */
+ public void testClusterRestartWithAutoDesignatedPrimary() throws Exception
+ {
+ testClusterRestartImpl(true);
+ }
+
+ /**
+ * Tests that a two node cluster, in which the master can NOT automatically designate itself
+ * primary (after becoming master) continues to operate after being shut down and restarted.
+ *
+ * The test does not concern itself with which broker becomes master at any given point
+ * (which is likely to swap during the test).
+ */
+ public void testClusterRestartWithoutAutoDesignatedPrimary() throws Exception
+ {
+ testClusterRestartImpl(false);
+ }
+
+ private void testClusterRestartImpl(boolean autoDesignatedPrimary) throws Exception
+ {
+ startCluster(autoDesignatedPrimary);
+ final Connection initialConnection = getConnection(_brokerFailoverUrl);
+ assertProducingConsuming(initialConnection);
+ initialConnection.close();
+ _clusterCreator.stopCluster();
+ _clusterCreator.startClusterParallel();
+ final Connection secondConnection = getConnection(_brokerFailoverUrl);
+ assertProducingConsuming(secondConnection);
+ secondConnection.close();
+ }
+
+ /**
+ * This test make sure than JMS operations are still working after stopping replica
+ * when master is designated primary (which is by default).
+ * <p>
+ * When master is not designated primary this test should fail.
+ */
+ public void testAutoDesignatedPrimaryContinuesAfterSecondaryStopped() throws Exception
+ {
+ startCluster(true);
+ _clusterCreator.stopNode(_clusterCreator.getBrokerPortNumberOfSecondaryNode());
+ final Connection connection = getConnection(_brokerFailoverUrl);
+ assertNotNull("Expected to get a valid connection to primary", connection);
+ assertProducingConsuming(connection);
+ }
+
+ public void testPersistentOperationsFailOnNonAutoDesignatedPrimarysAfterSecondaryStopped() throws Exception
+ {
+
+ startCluster(false);
+ _clusterCreator.stopNode(_clusterCreator.getBrokerPortNumberOfSecondaryNode());
+ final Connection connection = getConnection(_brokerFailoverUrl);
+ assertNotNull("Expected to get a valid connection to primary", connection);
+ try
+ {
+ assertProducingConsuming(connection);
+ fail("JMS peristent operations succeded on Master 'not designated primary' buy they should fail as replica is not available");
+ }
+ catch(JMSException e)
+ {
+ // JMSException should be thrown on transaction start/commit
+ }
+ }
+
+ public void testSecondaryDoesNotBecomePrimaryWhenAutoDesignatedPrimaryStopped() throws Exception
+ {
+ startCluster(true);
+ _clusterCreator.stopNode(_clusterCreator.getBrokerPortNumberOfPrimary());
+
+ try
+ {
+ getConnection(_brokerFailoverUrl);
+ fail("Connection not expected");
+ }
+ catch (JMSException e)
+ {
+ // PASS
+ }
+ }
+
+ public void testInitialDesignatedPrimaryStateOfNodes() throws Exception
+ {
+ startCluster(true);
+ final ManagedBDBHAMessageStore primaryStoreBean = getStoreBeanForNodeAtBrokerPort(_clusterCreator.getBrokerPortNumberOfPrimary());
+ assertTrue("Expected primary node to be set as designated primary", primaryStoreBean.getDesignatedPrimary());
+
+ final ManagedBDBHAMessageStore secondaryStoreBean = getStoreBeanForNodeAtBrokerPort(_clusterCreator.getBrokerPortNumberOfSecondaryNode());
+ assertFalse("Expected secondary node to NOT be set as designated primary", secondaryStoreBean.getDesignatedPrimary());
+ }
+
+ public void testSecondaryDesignatedAsPrimaryAfterOrginalPrimaryStopped() throws Exception
+ {
+ startCluster(true);
+ _clusterCreator.stopNode(_clusterCreator.getBrokerPortNumberOfPrimary());
+ final ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(_clusterCreator.getBrokerPortNumberOfSecondaryNode());
+
+ assertFalse("Expected node to NOT be set as designated primary", storeBean.getDesignatedPrimary());
+ storeBean.setDesignatedPrimary(true);
+ assertTrue("Expected node to now be set as designated primary", storeBean.getDesignatedPrimary());
+
+ final Connection connection = getConnection(_brokerFailoverUrl);
+ assertNotNull("Expected to get a valid connection to new primary", connection);
+ assertProducingConsuming(connection);
+ }
+
+ private ManagedBDBHAMessageStore getStoreBeanForNodeAtBrokerPort(
+ final int activeBrokerPortNumber) throws Exception
+ {
+ _jmxUtils.open(activeBrokerPortNumber);
+
+ ManagedBDBHAMessageStore storeBean = _jmxUtils.getManagedObject(ManagedBDBHAMessageStore.class, MANAGED_OBJECT_QUERY);
+ return storeBean;
+ }
+
+ private void assertProducingConsuming(final Connection connection) throws JMSException, Exception
+ {
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ Destination destination = session.createQueue(getTestQueueName());
+ MessageConsumer consumer = session.createConsumer(destination);
+ sendMessage(session, destination, 1);
+ connection.start();
+ Message m1 = consumer.receive(RECEIVE_TIMEOUT);
+ assertNotNull("Message 1 is not received", m1);
+ assertEquals("Unexpected first message received", 0, m1.getIntProperty(INDEX));
+ session.commit();
+ }
+
+}
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterWhiteboxTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterWhiteboxTest.java
new file mode 100644
index 0000000000..4b64466ff2
--- /dev/null
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterWhiteboxTest.java
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.apache.qpid.url.URLSyntaxException;
+
+import com.sleepycat.je.rep.InsufficientLogException;
+
+/**
+ * The HA white box tests test the BDB cluster where the test retains the knowledge of the
+ * individual test nodes. It uses this knowledge to examine the nodes to ensure that they
+ * remain in the correct state throughout the test.
+ *
+ * @see HAClusterBlackboxTest
+ */
+public class HAClusterWhiteboxTest extends QpidBrokerTestCase
+{
+ protected static final Logger LOGGER = Logger.getLogger(HAClusterWhiteboxTest.class);
+
+ private static final String VIRTUAL_HOST = "test";
+
+ private final int NUMBER_OF_NODES = 3;
+ private final HATestClusterCreator _clusterCreator = new HATestClusterCreator(this, VIRTUAL_HOST, NUMBER_OF_NODES);
+
+ // TODO Test for node falling behind
+ // TODO Factory refactoring?? // MessageStore construction??
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ _brokerType = BrokerType.SPAWNED;
+
+ assertTrue(isJavaBroker());
+ assertTrue(isBrokerStorePersistent());
+
+ setBrokerEnvironment("QPID_OPTS", "-Djava.util.logging.config.file=" + System.getProperty(QPID_HOME)
+ + File.separator + "etc" + File.separator + "log.properties");
+
+ _clusterCreator.configureClusterNodes();
+ _clusterCreator.startCluster();
+
+ super.setUp();
+ }
+
+ @Override
+ public void startBroker() throws Exception
+ {
+ // Don't start default broker provided by QBTC.
+ }
+
+ public void testClusterPermitsConnectionToOnlyOneNode() throws Exception
+ {
+ int connectionSuccesses = 0;
+ int connectionFails = 0;
+
+ for (int brokerPortNumber : getBrokerPortNumbers())
+ {
+ try
+ {
+ getConnection(_clusterCreator.getConnectionUrlForSingleNode(brokerPortNumber));
+ connectionSuccesses++;
+ }
+ catch(JMSException e)
+ {
+ assertTrue(e.getMessage().contains("Virtual host '" + VIRTUAL_HOST + "' is not active"));
+ connectionFails++;
+ }
+ }
+
+ assertEquals("Unexpected number of failed connections", NUMBER_OF_NODES - 1, connectionFails);
+ assertEquals("Unexpected number of successful connections", 1, connectionSuccesses);
+ }
+
+ public void testClusterThatLosesNodeStillAllowsConnection() throws Exception
+ {
+ final Connection initialConnection = getConnectionToNodeInCluster();
+ assertNotNull(initialConnection);
+
+ killConnectionBrokerAndWaitForNewMasterElection(initialConnection);
+
+ final Connection subsequentConnection = getConnectionToNodeInCluster();
+ assertNotNull(subsequentConnection);
+
+ // verify that JMS persistence operations are working
+ assertProducingConsuming(subsequentConnection);
+
+ closeConnection(initialConnection);
+ }
+
+ public void testClusterThatLosesAllButOneNodeRefusesConnection() throws Exception
+ {
+ final Connection initialConnection = getConnectionToNodeInCluster();
+ assertNotNull(initialConnection);
+
+ killConnectionBrokerAndWaitForNewMasterElection(initialConnection);
+
+ final Connection subsequentConnection = getConnectionToNodeInCluster();
+ assertNotNull(subsequentConnection);
+ final int subsequentPortNumber = _clusterCreator.getBrokerPortNumberFromConnection(subsequentConnection);
+
+ killBroker(subsequentPortNumber);
+
+ final Connection finalConnection = getConnectionToNodeInCluster();
+ assertNull(finalConnection);
+
+ closeConnection(initialConnection);
+ }
+
+ public void testClusterWithRestartedNodeStillAllowsConnection() throws Exception
+ {
+ final Connection connection = getConnectionToNodeInCluster();
+ assertNotNull(connection);
+
+ final int brokerPortNumber = _clusterCreator.getBrokerPortNumberFromConnection(connection);
+
+ _clusterCreator.stopNode(brokerPortNumber);
+ _clusterCreator.startNode(brokerPortNumber);
+
+ final Connection subsequentConnection = getConnectionToNodeInCluster();
+ assertNotNull(subsequentConnection);
+ }
+
+ public void testClusterLosingNodeRetainsData() throws Exception
+ {
+ final Connection initialConnection = getConnectionToNodeInCluster();
+
+ final String queueNamePrefix = getTestQueueName();
+ final String inbuiltExchangeQueueUrl = "direct://amq.direct/" + queueNamePrefix + "1/" + queueNamePrefix + "1?durable='true'";
+ final String customExchangeQueueUrl = "direct://my.exchange/" + queueNamePrefix + "2/" + queueNamePrefix + "2?durable='true'";
+
+ populateBrokerWithData(initialConnection, inbuiltExchangeQueueUrl, customExchangeQueueUrl);
+
+ killConnectionBrokerAndWaitForNewMasterElection(initialConnection);
+
+ final Connection subsequentConnection = getConnectionToNodeInCluster();
+
+ assertNotNull("no valid connection obtained", subsequentConnection);
+
+ checkBrokerData(subsequentConnection, inbuiltExchangeQueueUrl, customExchangeQueueUrl);
+ }
+
+ public void testRecoveryOfOutOfDateNode() throws Exception
+ {
+ /*
+ * TODO: Implement
+ *
+ * Cant yet find a way to control cleaning in a deterministic way to allow provoking
+ * a node to become out of date. We do now know that even a new joiner to the group
+ * can throw the InsufficientLogException, so ensuring an existing cluster of nodes has
+ * done *any* cleaning and then adding a new node should be sufficient to cause this.
+ */
+ }
+
+ private void populateBrokerWithData(final Connection connection, final String... queueUrls) throws JMSException, Exception
+ {
+ populateBrokerWithData(connection, 1, queueUrls);
+ }
+
+ private void populateBrokerWithData(final Connection connection, int noOfMessages, final String... queueUrls) throws JMSException, Exception
+ {
+ final Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ for (final String queueUrl : queueUrls)
+ {
+ final Queue queue = session.createQueue(queueUrl);
+ session.createConsumer(queue).close();
+ sendMessage(session, queue, noOfMessages);
+ }
+ }
+
+ private void checkBrokerData(final Connection connection, final String... queueUrls) throws JMSException
+ {
+ connection.start();
+ final Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ for (final String queueUrl : queueUrls)
+ {
+ final Queue queue = session.createQueue(queueUrl);
+ final MessageConsumer consumer = session.createConsumer(queue);
+ final Message message = consumer.receive(1000);
+ session.commit();
+ assertNotNull("Queue " + queue + " should have message", message);
+ assertEquals("Queue " + queue + " message has unexpected content", 0, message.getIntProperty(INDEX));
+ }
+ }
+
+ private Connection getConnectionToNodeInCluster() throws URLSyntaxException
+ {
+ Connection connection = null;
+ Set<Integer> runningBrokerPorts = getBrokerPortNumbers();
+
+ for (int brokerPortNumber : runningBrokerPorts)
+ {
+ try
+ {
+ connection = getConnection(_clusterCreator.getConnectionUrlForSingleNode(brokerPortNumber));
+ break;
+ }
+ catch(JMSException je)
+ {
+ assertTrue(je.getMessage().contains("Virtual host '" + VIRTUAL_HOST + "' is not active"));
+ }
+ }
+ return connection;
+ }
+
+ private void killConnectionBrokerAndWaitForNewMasterElection(final Connection initialConnection) throws IOException,
+ InterruptedException
+ {
+ try
+ {
+ // NewMasterEvent is received twice: first for the existing master,
+ // second for a new master
+ CountDownLatch newMasterLatch = new CountDownLatch(2);
+ _clusterCreator.startMonitorNode();
+ _clusterCreator.statListeningForNewMasterEvent(newMasterLatch);
+
+ final int initialPortNumber = _clusterCreator.getBrokerPortNumberFromConnection(initialConnection);
+ killBroker(initialPortNumber);
+
+ assertTrue("New master was not elected", newMasterLatch.await(30, TimeUnit.SECONDS));
+ }
+ finally
+ {
+ _clusterCreator.shutdownMonitor();
+ }
+ }
+
+ private void assertProducingConsuming(final Connection connection) throws JMSException, Exception
+ {
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ Destination destination = session.createQueue(getTestQueueName());
+ MessageConsumer consumer = session.createConsumer(destination);
+ sendMessage(session, destination, 2);
+ connection.start();
+ Message m1 = consumer.receive(RECEIVE_TIMEOUT);
+ assertNotNull("Message 1 is not received", m1);
+ assertEquals("Unexpected first message received", 0, m1.getIntProperty(INDEX));
+ Message m2 = consumer.receive(RECEIVE_TIMEOUT);
+ assertNotNull("Message 2 is not received", m2);
+ assertEquals("Unexpected second message received", 1, m2.getIntProperty(INDEX));
+ session.commit();
+ }
+
+ private void closeConnection(final Connection initialConnection)
+ {
+ try
+ {
+ initialConnection.close();
+ }
+ catch(Exception e)
+ {
+ // ignore.
+ // java.net.SocketException is seen sometimes on active connection
+ }
+ }
+}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAMessageStoreSmokeTest.java
index 7e5ef3f94c..eaa3c3eba4 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreFactory.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAMessageStoreSmokeTest.java
@@ -19,22 +19,25 @@
*/
package org.apache.qpid.server.store.berkeleydb;
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.MessageStoreFactory;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.XMLConfiguration;
+import org.apache.qpid.test.utils.QpidTestCase;
-public class BDBMessageStoreFactory implements MessageStoreFactory
+public class HAMessageStoreSmokeTest extends QpidTestCase
{
+ private final BDBHAMessageStore _store = new BDBHAMessageStore();
+ private final XMLConfiguration _config = new XMLConfiguration();
- @Override
- public MessageStore createMessageStore()
+ public void testMissingHAConfigThrowsException() throws Exception
{
- return new BDBMessageStore();
+ try
+ {
+ _store.configure("test", _config);
+ fail("Expected an exception to be thrown");
+ }
+ catch (ConfigurationException ce)
+ {
+ assertTrue(ce.getMessage().contains("BDB HA configuration key not found"));
+ }
}
-
- @Override
- public String getStoreClassName()
- {
- return BDBMessageStore.class.getSimpleName();
- }
-
-}
+} \ No newline at end of file
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java
new file mode 100644
index 0000000000..43cfa5f4d5
--- /dev/null
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java
@@ -0,0 +1,479 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+
+import org.apache.commons.configuration.XMLConfiguration;
+import org.apache.commons.lang.StringUtils;
+import org.apache.log4j.Logger;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQConnectionURL;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.apache.qpid.url.URLSyntaxException;
+
+import com.sleepycat.je.rep.ReplicationNode;
+import com.sleepycat.je.rep.monitor.GroupChangeEvent;
+import com.sleepycat.je.rep.monitor.JoinGroupEvent;
+import com.sleepycat.je.rep.monitor.LeaveGroupEvent;
+import com.sleepycat.je.rep.monitor.Monitor;
+import com.sleepycat.je.rep.monitor.MonitorChangeListener;
+import com.sleepycat.je.rep.monitor.MonitorConfig;
+import com.sleepycat.je.rep.monitor.NewMasterEvent;
+
+public class HATestClusterCreator
+{
+ protected static final Logger LOGGER = Logger.getLogger(HATestClusterCreator.class);
+
+ private static final String MANY_BROKER_URL_FORMAT = "amqp://guest:guest@/%s?brokerlist='%s'&failover='roundrobin?cyclecount='%d''";
+ private static final String BROKER_PORTION_FORMAT = "tcp://localhost:%d?connectdelay='%d',retries='%d'";
+ private static final String SINGLE_BROKER_URL_FORMAT = "amqp://guest:guest@/%s?brokerlist='tcp://localhost:%d?connectdelay='%d',retries='%d''";
+
+ private static final int CYCLECOUNT = 2;
+ private static final int RETRIES = 2;
+ private static final int CONNECTDELAY = 1000;
+
+ private final QpidBrokerTestCase _testcase;
+ private final Map<Integer, Integer> _brokerPortToBdbPortMap = new HashMap<Integer, Integer>();
+ private final Map<Integer, BrokerConfigHolder> _brokerConfigurations = new TreeMap<Integer, BrokerConfigHolder>();
+ private final String _virtualHostName;
+ private final String _configKeyPrefix;
+
+ private final String _ipAddressOfBroker;
+ private final String _groupName ;
+ private final int _numberOfNodes;
+ private int _bdbHelperPort;
+ private int _primaryBrokerPort;
+ private Monitor _monitor;
+
+ public HATestClusterCreator(QpidBrokerTestCase testcase, String virtualHostName, int numberOfNodes)
+ {
+ _testcase = testcase;
+ _virtualHostName = virtualHostName;
+ _groupName = "group" + _testcase.getName();
+ _ipAddressOfBroker = getIpAddressOfBrokerHost();
+ _numberOfNodes = numberOfNodes;
+ _configKeyPrefix = "virtualhosts.virtualhost." + _virtualHostName + ".store.";
+ _bdbHelperPort = 0;
+ }
+
+ public void configureClusterNodes() throws Exception
+ {
+ int brokerPort = _testcase.findFreePort();
+
+ for (int i = 0; i < _numberOfNodes; i++)
+ {
+ int bdbPort = _testcase.getNextAvailable(brokerPort + 1);
+ _brokerPortToBdbPortMap.put(brokerPort, bdbPort);
+
+ LOGGER.debug("Cluster broker port " + brokerPort + ", bdb replication port " + bdbPort);
+ if (_bdbHelperPort == 0)
+ {
+ _bdbHelperPort = bdbPort;
+ }
+
+ configureClusterNode(brokerPort, bdbPort);
+ collectConfig(brokerPort, _testcase.getTestConfiguration(), _testcase.getTestVirtualhosts());
+
+ brokerPort = _testcase.getNextAvailable(bdbPort + 1);
+ }
+ }
+
+ public void setAutoDesignatedPrimary(boolean autoDesignatedPrimary) throws Exception
+ {
+ if (_numberOfNodes != 2)
+ {
+ throw new IllegalArgumentException("Only two nodes groups have the concept of primary");
+ }
+
+ final Entry<Integer, BrokerConfigHolder> brokerConfigEntry = _brokerConfigurations.entrySet().iterator().next();
+ final String configKey = getConfigKey("highAvailability.autoDesignatedPrimary");
+ brokerConfigEntry.getValue().getTestVirtualhosts().setProperty(configKey, Boolean.toString(autoDesignatedPrimary));
+ _primaryBrokerPort = brokerConfigEntry.getKey();
+ }
+
+ /**
+ * @param configKeySuffix "highAvailability.designatedPrimary", for example
+ * @return "virtualhost.test.store.highAvailability.designatedPrimary", for example
+ */
+ private String getConfigKey(String configKeySuffix)
+ {
+ final String configKey = StringUtils.substringAfter(_configKeyPrefix + configKeySuffix, "virtualhosts.");
+ return configKey;
+ }
+
+ public void startNode(final int brokerPortNumber) throws Exception
+ {
+ final BrokerConfigHolder brokerConfigHolder = _brokerConfigurations.get(brokerPortNumber);
+
+ _testcase.setTestConfiguration(brokerConfigHolder.getTestConfiguration());
+ _testcase.setTestVirtualhosts(brokerConfigHolder.getTestVirtualhosts());
+
+ _testcase.startBroker(brokerPortNumber);
+ }
+
+ public void startCluster() throws Exception
+ {
+ for (final Integer brokerPortNumber : _brokerConfigurations.keySet())
+ {
+ startNode(brokerPortNumber);
+ }
+ }
+
+ public void startClusterParallel() throws Exception
+ {
+ final ExecutorService executor = Executors.newFixedThreadPool(_brokerConfigurations.size());
+ try
+ {
+ List<Future<Object>> brokers = new CopyOnWriteArrayList<Future<Object>>();
+ for (final Integer brokerPortNumber : _brokerConfigurations.keySet())
+ {
+ final BrokerConfigHolder brokerConfigHolder = _brokerConfigurations.get(brokerPortNumber);
+ Future<Object> future = executor.submit(new Callable<Object>()
+ {
+ public Object call()
+ {
+ try
+ {
+ _testcase.startBroker(brokerPortNumber, brokerConfigHolder.getTestConfiguration(),
+ brokerConfigHolder.getTestVirtualhosts());
+ return "OK";
+ }
+ catch (Exception e)
+ {
+ return e;
+ }
+ }
+ });
+ brokers.add(future);
+ }
+ for (Future<Object> future : brokers)
+ {
+ Object result = future.get(30, TimeUnit.SECONDS);
+ LOGGER.debug("Node startup result:" + result);
+ if (result instanceof Exception)
+ {
+ throw (Exception) result;
+ }
+ else if (!"OK".equals(result))
+ {
+ throw new Exception("One of the cluster nodes is not started");
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ stopCluster();
+ throw e;
+ }
+ finally
+ {
+ executor.shutdown();
+ }
+
+ }
+
+ public void stopNode(final int brokerPortNumber)
+ {
+ _testcase.stopBroker(brokerPortNumber);
+ }
+
+ public void stopCluster() throws Exception
+ {
+ shutdownMonitor();
+ for (final Integer brokerPortNumber : _brokerConfigurations.keySet())
+ {
+ try
+ {
+ stopNode(brokerPortNumber);
+ }
+ catch(Exception e)
+ {
+ LOGGER.warn("Failed to stop node on port:" + brokerPortNumber);
+ }
+ }
+ }
+
+ public int getBrokerPortNumberFromConnection(Connection connection)
+ {
+ final AMQConnection amqConnection = (AMQConnection)connection;
+ return amqConnection.getActiveBrokerDetails().getPort();
+ }
+
+ public int getPortNumberOfAnInactiveBroker(final Connection activeConnection)
+ {
+ final Set<Integer> allBrokerPorts = _testcase.getBrokerPortNumbers();
+ LOGGER.debug("Broker ports:" + allBrokerPorts);
+ final int activeBrokerPort = getBrokerPortNumberFromConnection(activeConnection);
+ allBrokerPorts.remove(activeBrokerPort);
+ LOGGER.debug("Broker ports:" + allBrokerPorts);
+ final int inactiveBrokerPort = allBrokerPorts.iterator().next();
+ return inactiveBrokerPort;
+ }
+
+ public int getBdbPortForBrokerPort(final int brokerPortNumber)
+ {
+ return _brokerPortToBdbPortMap.get(brokerPortNumber);
+ }
+
+ public Set<Integer> getBdbPortNumbers()
+ {
+ return new HashSet<Integer>(_brokerPortToBdbPortMap.values());
+ }
+
+ public AMQConnectionURL getConnectionUrlForAllClusterNodes() throws Exception
+ {
+ final StringBuilder brokerList = new StringBuilder();
+
+ for(Iterator<Integer> itr = _brokerPortToBdbPortMap.keySet().iterator(); itr.hasNext(); )
+ {
+ int brokerPortNumber = itr.next();
+
+ brokerList.append(String.format(BROKER_PORTION_FORMAT, brokerPortNumber, CONNECTDELAY, RETRIES));
+ if (itr.hasNext())
+ {
+ brokerList.append(";");
+ }
+ }
+
+ return new AMQConnectionURL(String.format(MANY_BROKER_URL_FORMAT, _virtualHostName, brokerList, CYCLECOUNT));
+ }
+
+ public AMQConnectionURL getConnectionUrlForSingleNode(final int brokerPortNumber) throws URLSyntaxException
+ {
+ String url = String.format(SINGLE_BROKER_URL_FORMAT, _virtualHostName, brokerPortNumber, CONNECTDELAY, RETRIES);
+ return new AMQConnectionURL(url);
+ }
+
+ public String getGroupName()
+ {
+ return _groupName;
+ }
+
+ public String getNodeNameForNodeAt(final int bdbPort)
+ {
+ return "node" + _testcase.getName() + bdbPort;
+ }
+
+ public String getNodeHostPortForNodeAt(final int bdbPort)
+ {
+ return _ipAddressOfBroker + ":" + bdbPort;
+ }
+
+ public String getHelperHostPort()
+ {
+ if (_bdbHelperPort == 0)
+ {
+ throw new IllegalStateException("Helper port not yet assigned.");
+ }
+
+ return _ipAddressOfBroker + ":" + _bdbHelperPort;
+ }
+
+ public void setHelperHostPort(int bdbHelperPort)
+ {
+ _bdbHelperPort = bdbHelperPort;
+ }
+
+ public int getBrokerPortNumberOfPrimary()
+ {
+ if (_numberOfNodes != 2)
+ {
+ throw new IllegalArgumentException("Only two nodes groups have the concept of primary");
+ }
+
+ return _primaryBrokerPort;
+ }
+
+ public int getBrokerPortNumberOfSecondaryNode()
+ {
+ final Set<Integer> portNumbers = getBrokerPortNumbersForNodes();
+ portNumbers.remove(getBrokerPortNumberOfPrimary());
+ return portNumbers.iterator().next();
+ }
+
+ public Set<Integer> getBrokerPortNumbersForNodes()
+ {
+ return new HashSet<Integer>(_brokerConfigurations.keySet());
+ }
+
+ private void configureClusterNode(final int brokerPort, final int bdbPort) throws Exception
+ {
+ final String nodeName = getNodeNameForNodeAt(bdbPort);
+
+ _testcase.setConfigurationProperty(_configKeyPrefix + "class", "org.apache.qpid.server.store.berkeleydb.BDBHAMessageStore");
+
+ _testcase.setConfigurationProperty(_configKeyPrefix + "highAvailability.groupName", _groupName);
+ _testcase.setConfigurationProperty(_configKeyPrefix + "highAvailability.nodeName", nodeName);
+ _testcase.setConfigurationProperty(_configKeyPrefix + "highAvailability.nodeHostPort", getNodeHostPortForNodeAt(bdbPort));
+ _testcase.setConfigurationProperty(_configKeyPrefix + "highAvailability.helperHostPort", getHelperHostPort());
+ // TODO replication policy
+ }
+
+ public String getIpAddressOfBrokerHost()
+ {
+ String brokerHost = _testcase.getBroker().getHost();
+ try
+ {
+ return InetAddress.getByName(brokerHost).getHostAddress();
+ }
+ catch (UnknownHostException e)
+ {
+ throw new RuntimeException("Could not determine IP address of host : " + brokerHost, e);
+ }
+ }
+
+ private void collectConfig(final int brokerPortNumber, XMLConfiguration testConfiguration, XMLConfiguration testVirtualhosts)
+ {
+ _brokerConfigurations.put(brokerPortNumber, new BrokerConfigHolder((XMLConfiguration) testConfiguration.clone(),
+ (XMLConfiguration) testVirtualhosts.clone()));
+ }
+
+ public class BrokerConfigHolder
+ {
+ private final XMLConfiguration _testConfiguration;
+ private final XMLConfiguration _testVirtualhosts;
+
+ public BrokerConfigHolder(XMLConfiguration testConfiguration, XMLConfiguration testVirtualhosts)
+ {
+ _testConfiguration = testConfiguration;
+ _testVirtualhosts = testVirtualhosts;
+ }
+
+ public XMLConfiguration getTestConfiguration()
+ {
+ return _testConfiguration;
+ }
+
+ public XMLConfiguration getTestVirtualhosts()
+ {
+ return _testVirtualhosts;
+ }
+ }
+
+ public void modifyClusterNodeBdbAddress(int brokerPortNumberToBeMoved, int newBdbPort)
+ {
+ final BrokerConfigHolder brokerConfigHolder = _brokerConfigurations.get(brokerPortNumberToBeMoved);
+ final XMLConfiguration virtualHostConfig = brokerConfigHolder.getTestVirtualhosts();
+
+ final String configKey = getConfigKey("highAvailability.nodeHostPort");
+ final String oldBdbHostPort = virtualHostConfig.getString(configKey);
+
+ final String[] oldHostAndPort = StringUtils.split(oldBdbHostPort, ":");
+ final String oldHost = oldHostAndPort[0];
+
+ final String newBdbHostPort = oldHost + ":" + newBdbPort;
+
+ virtualHostConfig.setProperty(configKey, newBdbHostPort);
+ collectConfig(brokerPortNumberToBeMoved, brokerConfigHolder.getTestConfiguration(), virtualHostConfig);
+ }
+
+ public void startMonitorNode()
+ {
+ shutdownMonitor();
+
+ MonitorConfig config = new MonitorConfig();
+ config.setGroupName(_groupName);
+ int monitorPort = _testcase.findFreePort();
+ config.setNodeName(getNodeNameForNodeAt(monitorPort));
+ config.setNodeHostPort("" + monitorPort);
+ config.setHelperHosts(getHelperHostPort());
+
+ _monitor = new Monitor(config);
+
+ ReplicationNode currentMaster = _monitor.register();
+ LOGGER.info("Current master " + currentMaster.getName());
+ }
+
+ public void startListening(MonitorChangeListener listener) throws IOException
+ {
+ _monitor.startListener(listener);
+ }
+
+ public void statListeningForNewMasterEvent(final CountDownLatch latch) throws IOException
+ {
+ startListening(new MonitorChangeListenerSupport(){
+ @Override
+ public void notify(NewMasterEvent newMasterEvent)
+ {
+ LOGGER.debug("New master is elected " + newMasterEvent.getMasterName());
+ latch.countDown();
+ }
+ });
+ }
+
+ public void shutdownMonitor()
+ {
+ if (_monitor != null)
+ {
+ try
+ {
+ _monitor.shutdown();
+ }
+ catch (Exception e)
+ {
+ LOGGER.warn("Monitor shutdown error:", e);
+ }
+ }
+ }
+
+ public static class MonitorChangeListenerSupport implements MonitorChangeListener
+ {
+
+ @Override
+ public void notify(NewMasterEvent newMasterEvent)
+ {
+ }
+
+ @Override
+ public void notify(GroupChangeEvent groupChangeEvent)
+ {
+ }
+
+ @Override
+ public void notify(JoinGroupEvent joinGroupEvent)
+ {
+ }
+
+ @Override
+ public void notify(LeaveGroupEvent leaveGroupEvent)
+ {
+ }
+ }
+}
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderTest.java
index ba5ca842bf..23fd9bc24f 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderTest.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderTest.java
@@ -24,7 +24,7 @@ import java.io.File;
import java.util.ArrayList;
import java.util.List;
-import org.apache.qpid.server.store.berkeleydb.BDBMessageStore;
+import org.apache.qpid.server.store.berkeleydb.AbstractBDBMessageStore;
import org.apache.qpid.server.store.berkeleydb.tuple.ContentBinding;
import com.sleepycat.bind.tuple.IntegerBinding;
@@ -94,7 +94,7 @@ public class UpgraderTest extends AbstractUpgradeTestCase
{
assertEquals("Unexpected store version", -1, getStoreVersion());
_upgrader.upgradeIfNecessary();
- assertEquals("Unexpected store version", BDBMessageStore.VERSION, getStoreVersion());
+ assertEquals("Unexpected store version", AbstractBDBMessageStore.VERSION, getStoreVersion());
assertContent();
}
@@ -112,7 +112,7 @@ public class UpgraderTest extends AbstractUpgradeTestCase
List<String> expectedDatabases = new ArrayList<String>();
expectedDatabases.add(Upgrader.VERSION_DB_NAME);
assertEquals("Expectedonly VERSION table in initially empty store after upgrade: ", expectedDatabases, databaseNames);
- assertEquals("Unexpected store version", BDBMessageStore.VERSION, getStoreVersion());
+ assertEquals("Unexpected store version", AbstractBDBMessageStore.VERSION, getStoreVersion());
nonExistentStoreLocation.delete();
}
diff --git a/qpid/java/broker/etc/virtualhosts.xml b/qpid/java/broker/etc/virtualhosts.xml
index 1f7f91de9a..0f7cc7866f 100644
--- a/qpid/java/broker/etc/virtualhosts.xml
+++ b/qpid/java/broker/etc/virtualhosts.xml
@@ -25,8 +25,8 @@
<name>localhost</name>
<localhost>
<store>
- <factoryclass>org.apache.qpid.server.store.MemoryMessageStoreFactory</factoryclass>
- <!--<factoryclass>org.apache.qpid.server.store.derby.DerbyMessageStoreFactory</factoryclass>
+ <class>org.apache.qpid.server.store.MemoryMessageStore</class>
+ <!--<class>org.apache.qpid.server.store.derby.DerbyMessageStore</class>
<environment-path>${QPID_WORK}/derbystore</environment-path>-->
</store>
@@ -86,8 +86,8 @@
<name>development</name>
<development>
<store>
- <factoryclass>org.apache.qpid.server.store.MemoryMessageStoreFactory</factoryclass>
- <!--<factoryclass>org.apache.qpid.server.store.derby.DerbyMessageStoreFactory</factoryclass>
+ <class>org.apache.qpid.server.store.MemoryMessageStore</class>
+ <!--<class>org.apache.qpid.server.store.derby.DerbyMessageStore</class>
<environment-path>${QPID_WORK}/derbystore</environment-path>-->
</store>
@@ -125,8 +125,8 @@
<name>test</name>
<test>
<store>
- <factoryclass>org.apache.qpid.server.store.MemoryMessageStoreFactory</factoryclass>
- <!--<factoryclass>org.apache.qpid.server.store.derby.DerbyMessageStoreFactory</factoryclass>
+ <class>org.apache.qpid.server.store.MemoryMessageStore</class>
+ <!--<class>org.apache.qpid.server.store.derby.DerbyMessageStore</class>
<environment-path>${QPID_WORK}/derbystore</environment-path>-->
</store>
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
index 5f472b6ddd..59c6926b76 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
@@ -32,7 +32,6 @@ import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.store.MemoryMessageStore;
-import org.apache.qpid.server.store.MemoryMessageStoreFactory;
import java.util.ArrayList;
import java.util.Arrays;
@@ -103,14 +102,14 @@ public class VirtualHostConfiguration extends ConfigurationPlugin
return getConfig().subset("store");
}
- public String getMessageStoreFactoryClass()
+ public String getMessageStoreClass()
{
- return getStringValue("store.factoryclass", MemoryMessageStoreFactory.class.getName());
+ return getStringValue("store.class", MemoryMessageStore.class.getName());
}
- public void setMessageStoreFactoryClass(String storeFactoryClass)
+ public void setMessageStoreClass(String storeFactoryClass)
{
- getConfig().setProperty("store.factoryclass", storeFactoryClass);
+ getConfig().setProperty("store.class", storeFactoryClass);
}
public List getExchanges()
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/Event.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/Event.java
index 9b5ceef35f..c681126c11 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/Event.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/Event.java
@@ -23,12 +23,20 @@ public enum Event
{
BEFORE_INIT,
AFTER_INIT,
+
BEFORE_ACTIVATE,
AFTER_ACTIVATE,
+
BEFORE_PASSIVATE,
AFTER_PASSIVATE,
+
BEFORE_CLOSE,
AFTER_CLOSE,
+
+ BEFORE_QUIESCE,
+ AFTER_QUIESCE,
+ BEFORE_RESTART,
+
PERSISTENT_MESSAGE_SIZE_OVERFULL,
- PERSISTENT_MESSAGE_SIZE_UNDERFULL,
+ PERSISTENT_MESSAGE_SIZE_UNDERFULL
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/EventManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/EventManager.java
index 21ae3924b8..bf3de2611d 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/EventManager.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/EventManager.java
@@ -24,9 +24,12 @@ import java.util.EnumMap;
import java.util.List;
import java.util.Map;
+import org.apache.log4j.Logger;
+
public class EventManager
{
private Map<Event, List<EventListener>> _listeners = new EnumMap<Event, List<EventListener>> (Event.class);
+ private static final Logger _LOGGER = Logger.getLogger(EventManager.class);
public synchronized void addEventListener(EventListener listener, Event... events)
{
@@ -46,6 +49,11 @@ public class EventManager
{
if (_listeners.containsKey(event))
{
+ if(_LOGGER.isDebugEnabled())
+ {
+ _LOGGER.debug("Received event " + event);
+ }
+
for (EventListener listener : _listeners.get(event))
{
listener.event(event);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/HAMessageStore.java
index a35db62b03..59483751ca 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreFactory.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/HAMessageStore.java
@@ -19,9 +19,11 @@
*/
package org.apache.qpid.server.store;
-public interface MessageStoreFactory
+public interface HAMessageStore extends MessageStore
{
- MessageStore createMessageStore();
-
- String getStoreClassName();
+ /**
+ * Used to indicate that a store requires to make itself unavailable for read and read/write
+ * operations.
+ */
+ void passivate();
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
index 59624b7a75..7b98b30860 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
@@ -83,19 +83,19 @@ public class MemoryMessageStore extends NullMessageStore
@Override
public void configureConfigStore(String name, ConfigurationRecoveryHandler recoveryHandler, Configuration config) throws Exception
{
- _stateManager.attainState(State.CONFIGURING);
+ _stateManager.attainState(State.INITIALISING);
}
@Override
public void configureMessageStore(String name, MessageStoreRecoveryHandler recoveryHandler, TransactionLogRecoveryHandler tlogRecoveryHandler, Configuration config) throws Exception
{
- _stateManager.attainState(State.CONFIGURED);
+ _stateManager.attainState(State.INITIALISED);
}
@Override
public void activate() throws Exception
{
- _stateManager.attainState(State.RECOVERING);
+ _stateManager.attainState(State.ACTIVATING);
_stateManager.attainState(State.ACTIVE);
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStoreFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStoreFactory.java
deleted file mode 100644
index 8724f102c6..0000000000
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStoreFactory.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store;
-
-public class MemoryMessageStoreFactory implements MessageStoreFactory
-{
-
- @Override
- public MessageStore createMessageStore()
- {
- return new MemoryMessageStore();
- }
-
- @Override
- public String getStoreClassName()
- {
- return MemoryMessageStore.class.getSimpleName();
- }
-
-}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/State.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/State.java
index 7cbdede85e..2783637b2a 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/State.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/State.java
@@ -20,19 +20,30 @@
*/
package org.apache.qpid.server.store;
+import org.apache.qpid.server.configuration.ConfiguredObject;
+
public enum State
{
-
+ /** The initial state of the store. In practice, the store immediately transitions to the subsequent states. */
INITIAL,
- CONFIGURING,
- CONFIGURED,
- RECOVERING,
+
+ INITIALISING,
+ /**
+ * The initial set-up of the store has completed.
+ * If the store is persistent, it has not yet loaded configuration for {@link ConfiguredObject}'s from disk.
+ *
+ * From the point of view of the user, the store is essentially stopped.
+ */
+ INITIALISED,
+
+ ACTIVATING,
ACTIVE,
- QUIESCING,
- QUIESCED,
+
CLOSING,
- CLOSED;
-
+ CLOSED,
+ QUIESCING,
+ /** The virtual host (and implicitly also the store) has been manually paused by the user to allow configuration changes to take place */
+ QUIESCED;
} \ No newline at end of file
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StateManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StateManager.java
index 5998be5bb6..613b329beb 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StateManager.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StateManager.java
@@ -24,6 +24,8 @@ package org.apache.qpid.server.store;
import java.util.EnumMap;
import java.util.Map;
+import org.apache.qpid.server.store.StateManager.Transition;
+
public class StateManager
{
private State _state = State.INITIAL;
@@ -70,16 +72,23 @@ public class StateManager
}
- public static final Transition CONFIGURE = new Transition(State.INITIAL, State.CONFIGURING, Event.BEFORE_INIT);
- public static final Transition CONFIGURE_COMPLETE = new Transition(State.CONFIGURING, State.CONFIGURED, Event.AFTER_INIT);
- public static final Transition RECOVER = new Transition(State.CONFIGURED, State.RECOVERING, Event.BEFORE_ACTIVATE);
- public static final Transition ACTIVATE = new Transition(State.RECOVERING, State.ACTIVE, Event.AFTER_ACTIVATE);
+ public static final Transition INITIALISE = new Transition(State.INITIAL, State.INITIALISING, Event.BEFORE_INIT);
+ public static final Transition INITALISE_COMPLETE = new Transition(State.INITIALISING, State.INITIALISED, Event.AFTER_INIT);
+
+ public static final Transition ACTIVATE = new Transition(State.INITIALISED, State.ACTIVATING, Event.BEFORE_ACTIVATE);
+ public static final Transition ACTIVATE_COMPLETE = new Transition(State.ACTIVATING, State.ACTIVE, Event.AFTER_ACTIVATE);
+
+ public static final Transition CLOSE_INITIALISED = new Transition(State.INITIALISED, State.CLOSING, Event.BEFORE_CLOSE);;
public static final Transition CLOSE_ACTIVE = new Transition(State.ACTIVE, State.CLOSING, Event.BEFORE_CLOSE);
public static final Transition CLOSE_QUIESCED = new Transition(State.QUIESCED, State.CLOSING, Event.BEFORE_CLOSE);
public static final Transition CLOSE_COMPLETE = new Transition(State.CLOSING, State.CLOSED, Event.AFTER_CLOSE);
- public static final Transition QUIESCE = new Transition(State.ACTIVE, State.QUIESCING, Event.BEFORE_PASSIVATE);
- public static final Transition QUIESCE_COMPLETE = new Transition(State.QUIESCING, State.QUIESCED, Event.BEFORE_PASSIVATE);
- public static final Transition RESTART = new Transition(State.QUIESCED, State.RECOVERING, Event.BEFORE_ACTIVATE);
+
+ public static final Transition PASSIVATE = new Transition(State.ACTIVE, State.INITIALISED, Event.BEFORE_PASSIVATE);
+
+ public static final Transition QUIESCE = new Transition(State.ACTIVE, State.QUIESCING, Event.BEFORE_QUIESCE);
+ public static final Transition QUIESCE_COMPLETE = new Transition(State.QUIESCING, State.QUIESCED, Event.AFTER_QUIESCE);
+
+ public static final Transition RESTART = new Transition(State.QUIESCED, State.ACTIVATING, Event.BEFORE_RESTART);
public StateManager(final EventManager eventManager)
@@ -105,16 +114,6 @@ public class StateManager
return _state;
}
- public synchronized void stateTransition(final State current, final State desired)
- {
- if (_state != current)
- {
- throw new IllegalStateException("Cannot transition to the state: " + desired + "; need to be in state: " + current
- + "; currently in state: " + _state);
- }
- attainState(desired);
- }
-
public synchronized void attainState(State desired)
{
Transition transition = null;
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
index de1ce1a9db..c065eb263b 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
@@ -260,7 +260,7 @@ public class DerbyMessageStore implements MessageStore
ConfigurationRecoveryHandler configRecoveryHandler,
Configuration storeConfiguration) throws Exception
{
- _stateManager.attainState(State.CONFIGURING);
+ _stateManager.attainState(State.INITIALISING);
_configRecoveryHandler = configRecoveryHandler;
commonConfiguration(name, storeConfiguration);
@@ -276,13 +276,13 @@ public class DerbyMessageStore implements MessageStore
_tlogRecoveryHandler = tlogRecoveryHandler;
_messageRecoveryHandler = recoveryHandler;
- _stateManager.attainState(State.CONFIGURED);
+ _stateManager.attainState(State.INITIALISED);
}
@Override
public void activate() throws Exception
{
- _stateManager.attainState(State.RECOVERING);
+ _stateManager.attainState(State.ACTIVATING);
// this recovers durable exchanges, queues, and bindings
recoverConfiguration(_configRecoveryHandler);
@@ -716,7 +716,7 @@ public class DerbyMessageStore implements MessageStore
public void close() throws Exception
{
_closed.getAndSet(true);
- _stateManager.stateTransition(State.ACTIVE, State.CLOSING);
+ _stateManager.attainState(State.CLOSING);
try
{
@@ -737,7 +737,7 @@ public class DerbyMessageStore implements MessageStore
}
}
- _stateManager.stateTransition(State.CLOSING, State.CLOSED);
+ _stateManager.attainState(State.CLOSED);
}
@Override
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStoreFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStoreFactory.java
deleted file mode 100644
index 12d7f64a8d..0000000000
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStoreFactory.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store.derby;
-
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.MessageStoreFactory;
-
-public class DerbyMessageStoreFactory implements MessageStoreFactory
-{
-
- @Override
- public MessageStore createMessageStore()
- {
- return new DerbyMessageStore();
- }
-
- @Override
- public String getStoreClassName()
- {
- return DerbyMessageStore.class.getSimpleName();
- }
-
-}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
index b05025467d..5a14092930 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
@@ -59,8 +59,8 @@ import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.stats.StatisticsCounter;
import org.apache.qpid.server.store.Event;
import org.apache.qpid.server.store.EventListener;
+import org.apache.qpid.server.store.HAMessageStore;
import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.MessageStoreFactory;
import org.apache.qpid.server.store.OperationalLoggingListener;
import org.apache.qpid.server.txn.DtxRegistry;
import org.apache.qpid.server.virtualhost.plugins.VirtualHostPlugin;
@@ -173,7 +173,7 @@ public class VirtualHostImpl implements VirtualHost, IConnectionRegistry.Registr
_brokerMBean = new AMQBrokerManagerMBean(_virtualHostMBean);
- _messageStore = initialiseMessageStore(hostConfig.getMessageStoreFactoryClass());
+ _messageStore = initialiseMessageStore(hostConfig.getMessageStoreClass());
configureMessageStore(hostConfig);
@@ -329,20 +329,19 @@ public class VirtualHostImpl implements VirtualHost, IConnectionRegistry.Registr
}
- private MessageStore initialiseMessageStore(final String messageStoreFactoryClass) throws Exception
+ private MessageStore initialiseMessageStore(final String messageStoreClass) throws Exception
{
- final Class<?> clazz = Class.forName(messageStoreFactoryClass);
+ final Class<?> clazz = Class.forName(messageStoreClass);
final Object o = clazz.newInstance();
- if (!(o instanceof MessageStoreFactory))
+ if (!(o instanceof MessageStore))
{
- throw new ClassCastException("Message store factory class must implement " + MessageStoreFactory.class +
+ throw new ClassCastException("Message store factory class must implement " + MessageStore.class +
". Class " + clazz + " does not.");
}
- final MessageStoreFactory messageStoreFactory = (MessageStoreFactory) o;
- final MessageStore messageStore = messageStoreFactory.createMessageStore();
- final MessageStoreLogSubject storeLogSubject = new MessageStoreLogSubject(this, messageStoreFactory.getStoreClassName());
+ final MessageStore messageStore = (MessageStore) o;
+ final MessageStoreLogSubject storeLogSubject = new MessageStoreLogSubject(this, clazz.getSimpleName());
OperationalLoggingListener.listen(messageStore, storeLogSubject);
messageStore.addEventListener(new BeforeActivationListener(), Event.BEFORE_ACTIVATE);
@@ -366,7 +365,10 @@ public class VirtualHostImpl implements VirtualHost, IConnectionRegistry.Registr
private void activateNonHAMessageStore() throws Exception
{
- _messageStore.activate();
+ if (!(_messageStore instanceof HAMessageStore))
+ {
+ _messageStore.activate();
+ }
}
private void initialiseModel(VirtualHostConfiguration config) throws ConfigurationException, AMQException
@@ -801,42 +803,42 @@ public class VirtualHostImpl implements VirtualHost, IConnectionRegistry.Registr
}
private final class BeforeActivationListener implements EventListener
- {
- @Override
- public void event(Event event)
- {
- try
- {
- _exchangeRegistry.initialise();
- initialiseModel(_vhostConfig);
- }
- catch (Exception e)
- {
- throw new RuntimeException("Failed to initialise virtual host after state change", e);
- }
- }
- }
-
- private final class AfterActivationListener implements EventListener
- {
- @Override
- public void event(Event event)
- {
- initialiseHouseKeeping(_vhostConfig.getHousekeepingCheckPeriod());
- try
- {
- _brokerMBean.register();
- }
- catch (JMException e)
- {
- throw new RuntimeException("Failed to register virtual host mbean for virtual host " + getName(), e);
- }
+ {
+ @Override
+ public void event(Event event)
+ {
+ try
+ {
+ _exchangeRegistry.initialise();
+ initialiseModel(_vhostConfig);
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException("Failed to initialise virtual host after state change", e);
+ }
+ }
+ }
+
+ private final class AfterActivationListener implements EventListener
+ {
+ @Override
+ public void event(Event event)
+ {
+ initialiseHouseKeeping(_vhostConfig.getHousekeepingCheckPeriod());
+ try
+ {
+ _brokerMBean.register();
+ }
+ catch (JMException e)
+ {
+ throw new RuntimeException("Failed to register virtual host mbean for virtual host " + getName(), e);
+ }
- _state = State.ACTIVE;
- }
- }
+ _state = State.ACTIVE;
+ }
+ }
- public class BeforePassivationListener implements EventListener
+ private final class BeforePassivationListener implements EventListener
{
public void event(Event event)
{
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java
index d26e286c90..d34d1bbef3 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java
@@ -36,7 +36,7 @@ import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.registry.IApplicationRegistry;
-import org.apache.qpid.server.store.TestableMemoryMessageStoreFactory;
+import org.apache.qpid.server.store.TestableMemoryMessageStore;
import org.apache.qpid.server.util.TestApplicationRegistry;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
@@ -165,7 +165,7 @@ public class AMQBrokerManagerMBeanTest extends QpidTestCase
XMLConfiguration configXml = new XMLConfiguration();
configXml.addProperty("virtualhosts.virtualhost(-1).name", "test");
- configXml.addProperty("virtualhosts.virtualhost(-1).test.store.factoryclass", TestableMemoryMessageStoreFactory.class.getName());
+ configXml.addProperty("virtualhosts.virtualhost(-1).test.store.class", TestableMemoryMessageStore.class.getName());
ServerConfiguration configuration = new ServerConfiguration(configXml);
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java
index c4c93acfb6..50e7f0588b 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java
@@ -27,7 +27,6 @@ import org.apache.qpid.server.queue.AMQPriorityQueue;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
-import org.apache.qpid.server.store.TestableMemoryMessageStoreFactory;
import org.apache.qpid.server.util.InternalBrokerBaseCase;
import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -162,7 +161,7 @@ public class VirtualHostConfigurationTest extends InternalBrokerBaseCase
getConfigXml().addProperty("virtualhosts.virtualhost." + getName() + "Extra.queues(-1).queue(-1).name", "r2d2");
getConfigXml().addProperty("virtualhosts.virtualhost." + getName() + "Extra.queues.queue.r2d2.deadLetterQueues", "true");
getConfigXml().addProperty("virtualhosts.virtualhost." + getName() + "Extra.queues(-1).queue(-1).name", "c3p0");
- getConfigXml().addProperty("virtualhosts.virtualhost." + getName() + "Extra.store.factoryclass", TestableMemoryMessageStoreFactory.class.getName());
+ getConfigXml().addProperty("virtualhosts.virtualhost." + getName() + "Extra.store.class", TestableMemoryMessageStore.class.getName());
// Start the broker now.
super.createBroker();
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java
index e123a968a4..337ff194c3 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java
@@ -20,8 +20,6 @@
*/
package org.apache.qpid.server.queue;
-import java.util.UUID;
-
import org.apache.commons.configuration.XMLConfiguration;
import org.apache.qpid.AMQException;
@@ -37,7 +35,6 @@ import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.actors.TestLogActor;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
-import org.apache.qpid.server.store.TestableMemoryMessageStoreFactory;
import org.apache.qpid.server.util.TestApplicationRegistry;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.test.utils.QpidTestCase;
@@ -56,7 +53,7 @@ public class AMQQueueFactoryTest extends QpidTestCase
XMLConfiguration configXml = new XMLConfiguration();
configXml.addProperty("virtualhosts.virtualhost(-1).name", getName());
- configXml.addProperty("virtualhosts.virtualhost(-1)."+getName()+".store.factoryclass", TestableMemoryMessageStoreFactory.class.getName());
+ configXml.addProperty("virtualhosts.virtualhost(-1)."+getName()+".store.class", TestableMemoryMessageStore.class.getName());
ServerConfiguration configuration = new ServerConfiguration(configXml);
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
index 52ad4a7c5b..a8fad96063 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
@@ -43,7 +43,6 @@ import org.apache.qpid.server.queue.SimpleAMQQueue.QueueEntryFilter;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
-import org.apache.qpid.server.store.TestableMemoryMessageStoreFactory;
import org.apache.qpid.server.subscription.MockSubscription;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.txn.AutoCommitTransaction;
@@ -108,7 +107,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase
PropertiesConfiguration env = new PropertiesConfiguration();
final VirtualHostConfiguration vhostConfig = new VirtualHostConfiguration(getClass().getName(), env);
- vhostConfig.setMessageStoreFactoryClass(TestableMemoryMessageStoreFactory.class.getName());
+ vhostConfig.setMessageStoreClass(TestableMemoryMessageStore.class.getName());
_virtualHost = new VirtualHostImpl(ApplicationRegistry.getInstance(), vhostConfig);
applicationRegistry.getVirtualHostRegistry().registerVirtualHost(_virtualHost);
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/DurableConfigurationStoreTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/DurableConfigurationStoreTest.java
index a1cbb2cbc8..48e631a0f4 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/DurableConfigurationStoreTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/DurableConfigurationStoreTest.java
@@ -27,7 +27,6 @@ import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.logging.SystemOutMessageLogger;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.actors.TestLogActor;
-import org.apache.qpid.server.logging.subjects.TestBlankSubject;
import org.apache.qpid.server.message.EnqueableMessage;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.queue.AMQQueue;
@@ -41,7 +40,7 @@ import org.apache.qpid.server.store.ConfigurationRecoveryHandler.ExchangeRecover
import org.apache.qpid.server.store.ConfigurationRecoveryHandler.QueueRecoveryHandler;
import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler;
import org.apache.qpid.server.store.Transaction.Record;
-import org.apache.qpid.server.store.derby.DerbyMessageStoreFactory;
+import org.apache.qpid.server.store.derby.DerbyMessageStore;
import org.apache.qpid.test.utils.QpidTestCase;
import org.apache.qpid.util.FileUtils;
@@ -262,14 +261,14 @@ public class DurableConfigurationStoreTest extends QpidTestCase
protected MessageStore createStore() throws Exception
{
- String storeFactoryClass = System.getProperty(MS_FACTORY_CLASS_NAME_KEY);
- if (storeFactoryClass == null)
+ String storeClass = System.getProperty(MESSAGE_STORE_CLASS_NAME_KEY);
+ if (storeClass == null)
{
- storeFactoryClass = DerbyMessageStoreFactory.class.getName();
+ storeClass = DerbyMessageStore.class.getName();
}
CurrentActor.set(new TestLogActor(new SystemOutMessageLogger()));
- MessageStoreFactory factory = (MessageStoreFactory) Class.forName(storeFactoryClass).newInstance();
- return factory.createMessageStore();
+ MessageStore messageStore = (MessageStore) Class.forName(storeClass).newInstance();
+ return messageStore;
}
public void testRecordXid() throws Exception
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
index 3fb0776083..64048d294b 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
@@ -59,7 +59,6 @@ import java.io.File;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.UUID;
/**
* This tests the MessageStores by using the available interfaces.
@@ -103,7 +102,7 @@ public class MessageStoreTest extends InternalBrokerBaseCase
String storePath = System.getProperty("QPID_WORK") + "/" + getName();
_config = new PropertiesConfiguration();
- _config.addProperty("store.factoryclass", getTestProfileMessageStoreFactoryClassName());
+ _config.addProperty("store.class", getTestProfileMessageStoreClassName());
_config.addProperty("store.environment-path", storePath);
cleanup(new File(storePath));
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/OperationalLoggingListenerTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/OperationalLoggingListenerTest.java
index 42746f9119..c6ef35d255 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/OperationalLoggingListenerTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/OperationalLoggingListenerTest.java
@@ -73,11 +73,11 @@ public class OperationalLoggingListenerTest extends TestCase
}
- messageStore.attainState(State.CONFIGURING);
+ messageStore.attainState(State.INITIALISING);
assertEquals("Unexpected number of operational log messages on configuring", 1, messages.size());
assertEquals(messages.remove(0).toString(), ConfigStoreMessages.CREATED().toString());
- messageStore.attainState(State.CONFIGURED);
+ messageStore.attainState(State.INITIALISED);
assertEquals("Unexpected number of operational log messages on CONFIGURED", setStoreLocation ? 3 : 2, messages.size());
assertEquals(messages.remove(0).toString(), MessageStoreMessages.CREATED().toString());
assertEquals(messages.remove(0).toString(), TransactionLogMessages.CREATED().toString());
@@ -86,7 +86,7 @@ public class OperationalLoggingListenerTest extends TestCase
assertEquals(messages.remove(0).toString(), MessageStoreMessages.STORE_LOCATION(STORE_LOCATION).toString());
}
- messageStore.attainState(State.RECOVERING);
+ messageStore.attainState(State.ACTIVATING);
assertEquals("Unexpected number of operational log messages on RECOVERING", 1, messages.size());
assertEquals(messages.remove(0).toString(), MessageStoreMessages.RECOVERY_START().toString());
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/StateManagerTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/StateManagerTest.java
index 97c88ca1d3..18efb976eb 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/StateManagerTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/StateManagerTest.java
@@ -45,8 +45,8 @@ public class StateManagerTest extends TestCase implements EventListener
{
assertEquals(State.INITIAL, _manager.getState());
- _manager.stateTransition(State.INITIAL, State.CONFIGURING);
- assertEquals(State.CONFIGURING, _manager.getState());
+ _manager.attainState(State.INITIALISING);
+ assertEquals(State.INITIALISING, _manager.getState());
}
public void testStateTransitionDisallowed()
@@ -55,7 +55,7 @@ public class StateManagerTest extends TestCase implements EventListener
try
{
- _manager.stateTransition(State.ACTIVE, State.CLOSING);
+ _manager.attainState(State.CLOSING);
fail("Exception not thrown");
}
catch (IllegalStateException e)
@@ -98,22 +98,29 @@ public class StateManagerTest extends TestCase implements EventListener
public void testValidStateTransitions()
{
assertEquals(State.INITIAL, _manager.getState());
- performValidTransition(StateManager.CONFIGURE);
- performValidTransition(StateManager.CONFIGURE_COMPLETE);
- performValidTransition(StateManager.RECOVER);
+ performValidTransition(StateManager.INITIALISE);
+ performValidTransition(StateManager.INITALISE_COMPLETE);
performValidTransition(StateManager.ACTIVATE);
+ performValidTransition(StateManager.ACTIVATE_COMPLETE);
performValidTransition(StateManager.QUIESCE);
performValidTransition(StateManager.QUIESCE_COMPLETE);
performValidTransition(StateManager.RESTART);
- performValidTransition(StateManager.ACTIVATE);
+ performValidTransition(StateManager.ACTIVATE_COMPLETE);
performValidTransition(StateManager.CLOSE_ACTIVE);
performValidTransition(StateManager.CLOSE_COMPLETE);
+
+ _manager = new StateManager(this);
+ assertEquals(State.INITIAL, _manager.getState());
+ performValidTransition(StateManager.INITIALISE);
+ performValidTransition(StateManager.INITALISE_COMPLETE);
+ performValidTransition(StateManager.CLOSE_INITIALISED);
+ performValidTransition(StateManager.CLOSE_COMPLETE);
_manager = new StateManager(this);
- performValidTransition(StateManager.CONFIGURE);
- performValidTransition(StateManager.CONFIGURE_COMPLETE);
- performValidTransition(StateManager.RECOVER);
+ performValidTransition(StateManager.INITIALISE);
+ performValidTransition(StateManager.INITALISE_COMPLETE);
performValidTransition(StateManager.ACTIVATE);
+ performValidTransition(StateManager.ACTIVATE_COMPLETE);
performValidTransition(StateManager.QUIESCE);
performValidTransition(StateManager.QUIESCE_COMPLETE);
performValidTransition(StateManager.CLOSE_QUIESCED);
@@ -132,54 +139,50 @@ public class StateManagerTest extends TestCase implements EventListener
{
assertEquals(State.INITIAL, _manager.getState());
-
- performInvalidTransitions(StateManager.CONFIGURE, State.CONFIGURED);
- performInvalidTransitions(StateManager.CONFIGURE_COMPLETE, State.RECOVERING);
- performInvalidTransitions(StateManager.RECOVER, State.ACTIVE);
- performInvalidTransitions(StateManager.ACTIVATE, State.QUIESCING, State.CLOSING);
+ performInvalidTransitions(StateManager.INITIALISE, State.INITIALISED);
+ performInvalidTransitions(StateManager.INITALISE_COMPLETE, State.ACTIVATING, State.CLOSING);
+ performInvalidTransitions(StateManager.ACTIVATE, State.ACTIVE);
+ performInvalidTransitions(StateManager.ACTIVATE_COMPLETE, State.QUIESCING, State.CLOSING, State.INITIALISED);
performInvalidTransitions(StateManager.QUIESCE, State.QUIESCED);
- performInvalidTransitions(StateManager.QUIESCE_COMPLETE, State.RECOVERING, State.CLOSING);
+ performInvalidTransitions(StateManager.QUIESCE_COMPLETE, State.ACTIVATING, State.CLOSING);
performInvalidTransitions(StateManager.CLOSE_QUIESCED, State.CLOSED);
performInvalidTransitions(StateManager.CLOSE_COMPLETE);
-
-
-
}
- private void performInvalidTransitions(StateManager.Transition preTransition, State... validTransitions)
+ private void performInvalidTransitions(StateManager.Transition preTransition, State... validEndStates)
{
if(preTransition != null)
{
performValidTransition(preTransition);
}
- EnumSet<State> nextStates = EnumSet.allOf(State.class);
+ EnumSet<State> endStates = EnumSet.allOf(State.class);
- if(validTransitions != null)
+ if(validEndStates != null)
{
- for(State state: validTransitions)
+ for(State state: validEndStates)
{
- nextStates.remove(state);
+ endStates.remove(state);
}
}
- for(State nextState : nextStates)
+ for(State invalidEndState : endStates)
{
- performInvalidStateTransition(nextState);
+ performInvalidStateTransition(invalidEndState);
}
}
- private void performInvalidStateTransition(State state)
+ private void performInvalidStateTransition(State invalidEndState)
{
try
{
_event = null;
State startState = _manager.getState();
- _manager.attainState(state);
- fail("Invalid state transition performed: " + startState + " to " + state);
+ _manager.attainState(invalidEndState);
+ fail("Invalid state transition performed: " + startState + " to " + invalidEndState);
}
catch(IllegalStateException e)
{
@@ -188,6 +191,7 @@ public class StateManagerTest extends TestCase implements EventListener
assertNull("No event should have be fired", _event);
}
+ @Override
public void event(Event event)
{
_event = event;
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStoreFactory.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStoreFactory.java
deleted file mode 100644
index 44070f22ad..0000000000
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStoreFactory.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store;
-
-public class TestableMemoryMessageStoreFactory implements MessageStoreFactory
-{
-
- @Override
- public MessageStore createMessageStore()
- {
- return new TestableMemoryMessageStore();
- }
-
- @Override
- public String getStoreClassName()
- {
- return TestableMemoryMessageStore.class.getSimpleName();
- }
-
-}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
index f8200bf1cd..8a18aaff9e 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
@@ -44,7 +44,6 @@ import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
-import org.apache.qpid.server.store.TestableMemoryMessageStoreFactory;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.test.utils.QpidTestCase;
@@ -67,10 +66,10 @@ public class InternalBrokerBaseCase extends QpidTestCase
super.setUp();
_configXml.addProperty("virtualhosts.virtualhost.name", "test");
- _configXml.addProperty("virtualhosts.virtualhost.test.store.factoryclass", TestableMemoryMessageStoreFactory.class.getName());
+ _configXml.addProperty("virtualhosts.virtualhost.test.store.class", TestableMemoryMessageStore.class.getName());
_configXml.addProperty("virtualhosts.virtualhost(-1).name", getName());
- _configXml.addProperty("virtualhosts.virtualhost(-1)."+getName()+".store.factoryclass", TestableMemoryMessageStoreFactory.class.getName());
+ _configXml.addProperty("virtualhosts.virtualhost(-1)."+getName()+".store.class", TestableMemoryMessageStore.class.getName());
createBroker();
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostImplTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostImplTest.java
index 87eb0f9d16..b8ba76e43d 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostImplTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostImplTest.java
@@ -27,7 +27,7 @@ import org.apache.qpid.server.configuration.ServerConfiguration;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.store.MemoryMessageStoreFactory;
+import org.apache.qpid.server.store.MemoryMessageStore;
import org.apache.qpid.server.util.TestApplicationRegistry;
import org.apache.qpid.test.utils.QpidTestCase;
@@ -192,7 +192,7 @@ public class VirtualHostImplTest extends QpidTestCase
writer.write(" <name>" + vhostName + "</name>");
writer.write(" <" + vhostName + ">");
writer.write(" <store>");
- writer.write(" <factoryclass>" + MemoryMessageStoreFactory.class.getName() + "</factoryclass>");
+ writer.write(" <class>" + MemoryMessageStore.class.getName() + "</class>");
writer.write(" </store>");
if(exchangeName != null && !dontDeclare)
{
diff --git a/qpid/java/common/src/test/java/org/apache/qpid/test/utils/QpidTestCase.java b/qpid/java/common/src/test/java/org/apache/qpid/test/utils/QpidTestCase.java
index cbf6caf141..8a14466aeb 100644
--- a/qpid/java/common/src/test/java/org/apache/qpid/test/utils/QpidTestCase.java
+++ b/qpid/java/common/src/test/java/org/apache/qpid/test/utils/QpidTestCase.java
@@ -110,8 +110,8 @@ public class QpidTestCase extends TestCase
}
}
- protected static final String MS_FACTORY_CLASS_NAME_KEY = "messagestorefactory.class.name";
- protected static final String MEMORY_STORE_FACTORY_CLASS_NAME = "org.apache.qpid.server.store.MemoryMessageStoreFactory";
+ protected static final String MESSAGE_STORE_CLASS_NAME_KEY = "messagestore.class.name";
+ protected static final String MEMORY_STORE_CLASS_NAME = "org.apache.qpid.server.store.MemoryMessageStore";
private static List<String> _exclusionList;
@@ -140,12 +140,12 @@ public class QpidTestCase extends TestCase
}
}
- public String getTestProfileMessageStoreFactoryClassName()
+ public String getTestProfileMessageStoreClassName()
{
- final String storeFactoryClass = System.getProperty(MS_FACTORY_CLASS_NAME_KEY);
- _logger.debug("MS_FACTORY_CLASS_NAME_KEY " + storeFactoryClass);
+ final String storeClass = System.getProperty(MESSAGE_STORE_CLASS_NAME_KEY);
+ _logger.debug("MESSAGE_STORE_CLASS_NAME_KEY " + storeClass);
- return storeFactoryClass != null ? storeFactoryClass : MEMORY_STORE_FACTORY_CLASS_NAME ;
+ return storeClass != null ? storeClass : MEMORY_STORE_CLASS_NAME ;
}
diff --git a/qpid/java/module.xml b/qpid/java/module.xml
index 1261a1bdea..8c6d815fee 100644
--- a/qpid/java/module.xml
+++ b/qpid/java/module.xml
@@ -349,7 +349,7 @@
<sysproperty key="log4j.configuration" value="${log4j.configuration}"/>
<sysproperty key="java.naming.factory.initial" value="${java.naming.factory.initial}"/>
<sysproperty key="java.naming.provider.url" value="${java.naming.provider.url}"/>
- <sysproperty key="messagestorefactory.class.name" value="${messagestorefactory.class.name}" />
+ <sysproperty key="messagestore.class.name" value="${messagestore.class.name}" />
<sysproperty key="test.output" value="${module.results}"/>
<sysproperty key="qpid.amqp.version" value="${qpid.amqp.version}"/>
diff --git a/qpid/java/systests/etc/log.properties b/qpid/java/systests/etc/log.properties
new file mode 100644
index 0000000000..745c5187c9
--- /dev/null
+++ b/qpid/java/systests/etc/log.properties
@@ -0,0 +1,2 @@
+com.sleepycat.je.util.FileHandler.level=ALL
+com.sleepycat.je.util.ConsoleHandler.level=ALL
diff --git a/qpid/java/systests/etc/virtualhosts-systests-bdb-settings.xml b/qpid/java/systests/etc/virtualhosts-systests-bdb-settings.xml
index 216046b40b..ce16523f13 100644
--- a/qpid/java/systests/etc/virtualhosts-systests-bdb-settings.xml
+++ b/qpid/java/systests/etc/virtualhosts-systests-bdb-settings.xml
@@ -26,7 +26,7 @@
<name>localhost</name>
<localhost>
<store>
- <factoryclass>org.apache.qpid.server.store.berkeleydb.BDBMessageStoreFactory</factoryclass>
+ <class>org.apache.qpid.server.store.berkeleydb.BDBMessageStore</class>
<environment-path>${work}/bdbstore/localhost-store</environment-path>
</store>
</localhost>
@@ -36,7 +36,7 @@
<name>development</name>
<development>
<store>
- <factoryclass>org.apache.qpid.server.store.berkeleydb.BDBMessageStoreFactory</factoryclass>
+ <class>org.apache.qpid.server.store.berkeleydb.BDBMessageStore</class>
<environment-path>${work}/bdbstore/development-store</environment-path>
</store>
</development>
@@ -46,7 +46,7 @@
<name>test</name>
<test>
<store>
- <factoryclass>org.apache.qpid.server.store.berkeleydb.BDBMessageStoreFactory</factoryclass>
+ <class>org.apache.qpid.server.store.berkeleydb.BDBMessageStore</class>
<environment-path>${work}/bdbstore/test-store</environment-path>
</store>
</test>
diff --git a/qpid/java/systests/etc/virtualhosts-systests-derby-settings.xml b/qpid/java/systests/etc/virtualhosts-systests-derby-settings.xml
index d494768e91..08a40ca812 100644
--- a/qpid/java/systests/etc/virtualhosts-systests-derby-settings.xml
+++ b/qpid/java/systests/etc/virtualhosts-systests-derby-settings.xml
@@ -26,7 +26,7 @@
<virtualhost>
<localhost>
<store>
- <factoryclass>org.apache.qpid.server.store.derby.DerbyMessageStoreFactory</factoryclass>
+ <class>org.apache.qpid.server.store.derby.DerbyMessageStore</class>
<environment-path>${QPID_WORK}/derbyDB/localhost-store</environment-path>
</store>
</localhost>
@@ -35,7 +35,7 @@
<virtualhost>
<development>
<store>
- <factoryclass>org.apache.qpid.server.store.derby.DerbyMessageStoreFactory</factoryclass>
+ <class>org.apache.qpid.server.store.derby.DerbyMessageStore</class>
<environment-path>${QPID_WORK}/derbyDB/development-store</environment-path>
</store>
</development>
@@ -44,7 +44,7 @@
<virtualhost>
<test>
<store>
- <factoryclass>org.apache.qpid.server.store.derby.DerbyMessageStoreFactory</factoryclass>
+ <class>org.apache.qpid.server.store.derby.DerbyMessageStore</class>
<environment-path>${QPID_WORK}/derbyDB/test-store</environment-path>
</store>
</test>
diff --git a/qpid/java/systests/etc/virtualhosts-systests-firewall-2.xml b/qpid/java/systests/etc/virtualhosts-systests-firewall-2.xml
index f8e9fde8ca..20908e6eb4 100644
--- a/qpid/java/systests/etc/virtualhosts-systests-firewall-2.xml
+++ b/qpid/java/systests/etc/virtualhosts-systests-firewall-2.xml
@@ -26,7 +26,7 @@
<name>test</name>
<test>
<store>
- <factoryclass>org.apache.qpid.server.store.MemoryMessageStoreFactory</factoryclass>
+ <class>org.apache.qpid.server.store.MemoryMessageStore</class>
</store>
</test>
</virtualhost>
@@ -35,7 +35,7 @@
<name>test2</name>
<test2>
<store>
- <factoryclass>org.apache.qpid.server.store.MemoryMessageStoreFactory</factoryclass>
+ <class>org.apache.qpid.server.store.MemoryMessageStore</class>
</store>
<security>
<firewall default-action="deny">
diff --git a/qpid/java/systests/etc/virtualhosts-systests-firewall-3.xml b/qpid/java/systests/etc/virtualhosts-systests-firewall-3.xml
index 95db02672a..90377f345f 100644
--- a/qpid/java/systests/etc/virtualhosts-systests-firewall-3.xml
+++ b/qpid/java/systests/etc/virtualhosts-systests-firewall-3.xml
@@ -26,7 +26,7 @@
<name>test</name>
<test>
<store>
- <factoryclass>org.apache.qpid.server.store.MemoryMessageStoreFactory</factoryclass>
+ <class>org.apache.qpid.server.store.MemoryMessageStore</class>
</store>
</test>
</virtualhost>
@@ -35,7 +35,7 @@
<name>test2</name>
<test2>
<store>
- <factoryclass>org.apache.qpid.server.store.MemoryMessageStoreFactory</factoryclass>
+ <class>org.apache.qpid.server.store.MemoryMessageStore</class>
</store>
<security>
<firewall default-action="deny"/>
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStoreFactory.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStoreFactory.java
deleted file mode 100644
index 6497a640d2..0000000000
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStoreFactory.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store;
-
-public class SlowMessageStoreFactory implements MessageStoreFactory
-{
-
- @Override
- public MessageStore createMessageStore()
- {
- return new SlowMessageStore();
- }
-
- @Override
- public String getStoreClassName()
- {
- return SlowMessageStore.class.getSimpleName();
- }
-
-}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/StoreOverfullTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/StoreOverfullTest.java
index 283fb4ed4c..bcee4e4930 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/StoreOverfullTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/StoreOverfullTest.java
@@ -58,7 +58,13 @@ public class StoreOverfullTest extends QpidBrokerTestCase
String.valueOf(OVERFULL_SIZE));
setConfigurationProperty("virtualhosts.virtualhost.test.store.underfull-size",
String.valueOf(UNDERFULL_SIZE));
- setSystemProperty("qpid.bdb.envconfig.je.log.fileMax", "1000000");
+
+ if(getTestProfileMessageStoreClassName().contains("BDB"))
+ {
+ setConfigurationProperty("virtualhosts.virtualhost.test.store.envConfig(1).name", "je.log.fileMax");
+ setConfigurationProperty("virtualhosts.virtualhost.test.store.envConfig(1).value", "1000000");
+ }
+
super.setUp();
_producerConnection = getConnection();
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitDelayTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitDelayTest.java
index 91f5cb7770..ee81e7c372 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitDelayTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitDelayTest.java
@@ -54,7 +54,7 @@ public class SyncWaitDelayTest extends QpidBrokerTestCase
public void setUp() throws Exception
{
- setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST+".store.factoryclass", "org.apache.qpid.server.store.SlowMessageStoreFactory");
+ setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST+".store.class", "org.apache.qpid.server.store.SlowMessageStore");
setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST+".store.delays.commitTran.post", String.valueOf(POST_COMMIT_DELAY));
setConfigurationProperty("management.enabled", "false");
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/JMXTestUtils.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/JMXTestUtils.java
index d9c259c389..1ef6164db6 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/JMXTestUtils.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/JMXTestUtils.java
@@ -79,8 +79,16 @@ public class JMXTestUtils
public void open() throws Exception
{
+ open(0); // Zero signifies default broker to QBTC.
+ }
+
+ public void open(final int brokerPort) throws Exception
+ {
+ int actualBrokerPort = _test.getPort(brokerPort);
+ int managementPort = _test.getManagementPort(actualBrokerPort);
+
_jmxc = JMXConnnectionFactory.getJMXConnection(5000, "127.0.0.1",
- _test.getManagementPort(_test.getPort()), _user, _password);
+ managementPort, _user, _password);
_mbsc = _jmxc.getMBeanServerConnection();
}
@@ -319,7 +327,7 @@ public class JMXTestUtils
Set<ObjectName> objectNames = queryObjects(query);
_test.assertNotNull("Null ObjectName Set returned", objectNames);
- _test.assertEquals("More than one " + managedClass + " returned", 1, objectNames.size());
+ _test.assertEquals("Unexpected number of objects matching " + managedClass + " returned", 1, objectNames.size());
ObjectName objectName = objectNames.iterator().next();
_test.getLogger().info("Loading: " + objectName);
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
index 4d4dee001b..9f019443f5 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
@@ -64,7 +64,7 @@ import org.apache.qpid.server.ProtocolExclusion;
import org.apache.qpid.server.configuration.ServerConfiguration;
import org.apache.qpid.server.protocol.AmqpProtocolVersion;
import org.apache.qpid.server.store.MessageStoreConstants;
-import org.apache.qpid.server.store.derby.DerbyMessageStoreFactory;
+import org.apache.qpid.server.store.derby.DerbyMessageStore;
import org.apache.qpid.url.URLSyntaxException;
import org.apache.qpid.util.FileUtils;
import org.apache.qpid.util.LogMonitor;
@@ -74,7 +74,6 @@ import org.apache.qpid.util.LogMonitor;
*/
public class QpidBrokerTestCase extends QpidTestCase
{
-
public enum BrokerType
{
EXTERNAL /** Test case relies on a Broker started independently of the test-suite */,
@@ -110,8 +109,10 @@ public class QpidBrokerTestCase extends QpidTestCase
}
// system properties
+ private static final String TEST_VIRTUALHOSTS = "test.virtualhosts";
+ private static final String TEST_CONFIG = "test.config";
private static final String BROKER_LANGUAGE = "broker.language";
- private static final String BROKER_TYPE = "broker.type";
+ protected static final String BROKER_TYPE = "broker.type";
private static final String BROKER_COMMAND = "broker.command";
private static final String BROKER_CLEAN_BETWEEN_TESTS = "broker.clean.between.tests";
private static final String BROKER_EXISTING_QPID_WORK = "broker.existing.qpid.work";
@@ -190,7 +191,7 @@ public class QpidBrokerTestCase extends QpidTestCase
{
super();
}
-
+
public Logger getLogger()
{
return QpidBrokerTestCase._logger;
@@ -346,11 +347,16 @@ public class QpidBrokerTestCase extends QpidTestCase
public void startBroker(int port) throws Exception
{
+ startBroker(port, _testConfiguration, _testVirtualhosts);
+ }
+
+ public void startBroker(int port, XMLConfiguration testConfiguration, XMLConfiguration virtualHosts) throws Exception
+ {
port = getPort(port);
// Save any configuration changes that have been made
- saveTestConfiguration();
- saveTestVirtualhosts();
+ String testConfig = saveTestConfiguration(port, testConfiguration);
+ String virtualHostsConfig = saveTestVirtualhosts(port, virtualHosts);
if(_brokers.get(port) != null)
{
@@ -360,7 +366,11 @@ public class QpidBrokerTestCase extends QpidTestCase
if (_brokerType.equals(BrokerType.INTERNAL) && !existingInternalBroker())
{
setConfigurationProperty(ServerConfiguration.MGMT_CUSTOM_REGISTRY_SOCKET, String.valueOf(false));
- saveTestConfiguration();
+ testConfig = saveTestConfiguration(port, testConfiguration);
+ _logger.info("Set test.config property to: " + testConfig);
+ _logger.info("Set test.virtualhosts property to: " + virtualHostsConfig);
+ setSystemProperty(TEST_CONFIG, testConfig);
+ setSystemProperty(TEST_VIRTUALHOSTS, virtualHostsConfig);
BrokerOptions options = new BrokerOptions();
options.setConfigFile(_configFile.getAbsolutePath());
@@ -388,16 +398,16 @@ public class QpidBrokerTestCase extends QpidTestCase
_logger.info("starting external broker: " + cmd);
ProcessBuilder pb = new ProcessBuilder(cmd.split("\\s+"));
pb.redirectErrorStream(true);
- Map<String, String> env = pb.environment();
+ Map<String, String> processEnv = pb.environment();
String qpidHome = System.getProperty(QPID_HOME);
- env.put(QPID_HOME, qpidHome);
+ processEnv.put(QPID_HOME, qpidHome);
//Augment Path with bin directory in QPID_HOME.
- env.put("PATH", env.get("PATH").concat(File.pathSeparator + qpidHome + "/bin"));
+ processEnv.put("PATH", processEnv.get("PATH").concat(File.pathSeparator + qpidHome + "/bin"));
//Add the test name to the broker run.
// DON'T change PNAME, qpid.stop needs this value.
- env.put("QPID_PNAME", "-DPNAME=QPBRKR -DTNAME=\"" + getTestName() + "\"");
- env.put("QPID_WORK", qpidWork);
+ processEnv.put("QPID_PNAME", "-DPNAME=QPBRKR -DTNAME=\"" + getTestName() + "\"");
+ processEnv.put("QPID_WORK", qpidWork);
// Use the environment variable to set amqj.logging.level for the broker
// The value used is a 'server' value in the test configuration to
@@ -412,7 +422,7 @@ public class QpidBrokerTestCase extends QpidTestCase
{
for (Map.Entry<String, String> entry : _env.entrySet())
{
- env.put(entry.getKey(), entry.getValue());
+ processEnv.put(entry.getKey(), entry.getValue());
}
}
@@ -429,25 +439,25 @@ public class QpidBrokerTestCase extends QpidTestCase
setSystemProperty("root.logging.level");
}
+ // set test.config and test.virtualhosts
+ String qpidOpts = " -D" + TEST_CONFIG + "=" + testConfig + " -D" + TEST_VIRTUALHOSTS + "=" + virtualHostsConfig;
- String QPID_OPTS = " ";
// Add all the specified system properties to QPID_OPTS
if (!_propertiesSetForBroker.isEmpty())
{
for (String key : _propertiesSetForBroker.keySet())
{
- QPID_OPTS += "-D" + key + "=" + _propertiesSetForBroker.get(key) + " ";
- }
-
- if (env.containsKey("QPID_OPTS"))
- {
- env.put("QPID_OPTS", env.get("QPID_OPTS") + QPID_OPTS);
- }
- else
- {
- env.put("QPID_OPTS", QPID_OPTS);
+ qpidOpts += " -D" + key + "=" + _propertiesSetForBroker.get(key);
}
}
+ if (processEnv.containsKey("QPID_OPTS"))
+ {
+ qpidOpts = processEnv.get("QPID_OPTS") + qpidOpts;
+ }
+ processEnv.put("QPID_OPTS", qpidOpts);
+
+ _logger.info("Set test.config property to: " + testConfig);
+ _logger.info("Set test.virtualhosts property to: " + virtualHostsConfig);
// cpp broker requires that the work directory is created
createBrokerWork(qpidWork);
@@ -545,12 +555,17 @@ public class QpidBrokerTestCase extends QpidTestCase
public String getTestConfigFile()
{
- return _output + "/" + getTestQueueName() + "-config.xml";
+ return getTestConfigFile(getPort());
}
- public String getTestVirtualhostsFile()
+ public String getTestConfigFile(int port)
{
- return _output + "/" + getTestQueueName() + "-virtualhosts.xml";
+ return _output + "/" + getTestQueueName() + "-" + port + "-config.xml";
+ }
+
+ public String getTestVirtualhostsFile(int port)
+ {
+ return _output + "/" + getTestQueueName() + "-" + port + "-virtualhosts.xml";
}
private String relativeToQpidHome(String file)
@@ -560,38 +575,50 @@ public class QpidBrokerTestCase extends QpidTestCase
protected void saveTestConfiguration() throws ConfigurationException
{
+ String relative = saveTestConfiguration(getPort(), _testConfiguration);
+ _logger.info("Set test.config property to: " + relative);
+ setSystemProperty(TEST_CONFIG, relative);
+ }
+
+ protected String saveTestConfiguration(int port, XMLConfiguration testConfiguration) throws ConfigurationException
+ {
// Specify the test config file
- String testConfig = getTestConfigFile();
+ String testConfig = getTestConfigFile(port);
String relative = relativeToQpidHome(testConfig);
- setSystemProperty("test.config", relative);
- _logger.info("Set test.config property to: " + relative);
_logger.info("Saving test virtualhosts file at: " + testConfig);
// Create the file if configuration does not exist
- if (_testConfiguration.isEmpty())
+ if (testConfiguration.isEmpty())
{
- _testConfiguration.addProperty("__ignore", "true");
+ testConfiguration.addProperty("__ignore", "true");
}
- _testConfiguration.save(testConfig);
+ testConfiguration.save(testConfig);
+ return relative;
}
protected void saveTestVirtualhosts() throws ConfigurationException
{
+ String relative = saveTestVirtualhosts(getPort(), _testVirtualhosts);
+ _logger.info("Set test.virtualhosts property to: " + relative);
+ setSystemProperty(TEST_VIRTUALHOSTS, relative);
+ }
+
+ protected String saveTestVirtualhosts(int port, XMLConfiguration virtualHostConfiguration) throws ConfigurationException
+ {
// Specify the test virtualhosts file
- String testVirtualhosts = getTestVirtualhostsFile();
+ String testVirtualhosts = getTestVirtualhostsFile(port);
String relative = relativeToQpidHome(testVirtualhosts);
- setSystemProperty("test.virtualhosts", relative);
- _logger.info("Set test.virtualhosts property to: " + relative);
- _logger.info("Saving test virtualhosts file at: " + testVirtualhosts);
+ _logger.info("Set test.virtualhosts property to: " + testVirtualhosts);
// Create the file if configuration does not exist
- if (_testVirtualhosts.isEmpty())
+ if (virtualHostConfiguration.isEmpty())
{
- _testVirtualhosts.addProperty("__ignore", "true");
+ virtualHostConfiguration.addProperty("__ignore", "true");
}
- _testVirtualhosts.save(testVirtualhosts);
+ virtualHostConfiguration.save(testVirtualhosts);
+ return relative;
}
protected void cleanBrokerWork(final String qpidWork)
@@ -698,21 +725,21 @@ public class QpidBrokerTestCase extends QpidTestCase
protected void makeVirtualHostPersistent(String virtualhost)
throws ConfigurationException, IOException
{
- Class<?> storeFactoryClass = null;
+ Class<?> storeClass = null;
try
{
// Try and lookup the BDB class
- storeFactoryClass = Class.forName("org.apache.qpid.server.store.berkeleydb.BDBMessageStoreFactory");
+ storeClass = Class.forName("org.apache.qpid.server.store.berkeleydb.BDBMessageStore");
}
catch (ClassNotFoundException e)
{
// No BDB store, we'll use Derby instead.
- storeFactoryClass = DerbyMessageStoreFactory.class;
+ storeClass = DerbyMessageStore.class;
}
- setConfigurationProperty("virtualhosts.virtualhost." + virtualhost + ".store.factoryclass",
- storeFactoryClass.getName());
+ setConfigurationProperty("virtualhosts.virtualhost." + virtualhost + ".store.class",
+ storeClass.getName());
setConfigurationProperty("virtualhosts.virtualhost." + virtualhost + ".store." + MessageStoreConstants.ENVIRONMENT_PATH_PROPERTY,
"${QPID_WORK}/" + virtualhost);
}
@@ -1288,4 +1315,24 @@ public class QpidBrokerTestCase extends QpidTestCase
{
return FAILING_PORT;
}
+
+ public XMLConfiguration getTestVirtualhosts()
+ {
+ return _testVirtualhosts;
+ }
+
+ public void setTestVirtualhosts(XMLConfiguration testVirtualhosts)
+ {
+ _testVirtualhosts = testVirtualhosts;
+ }
+
+ public XMLConfiguration getTestConfiguration()
+ {
+ return _testConfiguration;
+ }
+
+ public void setTestConfiguration(XMLConfiguration testConfiguration)
+ {
+ _testConfiguration = testConfiguration;
+ }
}
diff --git a/qpid/java/test-profiles/java-bdb-spawn.0-10.testprofile b/qpid/java/test-profiles/java-bdb-spawn.0-10.testprofile
index 2cef1fd53e..cba348b67f 100644
--- a/qpid/java/test-profiles/java-bdb-spawn.0-10.testprofile
+++ b/qpid/java/test-profiles/java-bdb-spawn.0-10.testprofile
@@ -23,7 +23,7 @@ broker.command=build/bin/qpid-server -p @PORT -m @MPORT -c @CONFIG_FILE -l test-
broker.ready=BRK-1004
broker.stopped=Exception
broker.config=build/etc/config-systests-bdb.xml
-messagestorefactory.class.name=org.apache.qpid.server.store.berkeleydb.BDBMessageStoreFactory
+messagestore.class.name=org.apache.qpid.server.store.berkeleydb.BDBMessageStore
profile.excludes=JavaExcludes JavaPersistentExcludes Java010Excludes JavaBDBExcludes
broker.clean.between.tests=true
broker.persistent=true
diff --git a/qpid/java/test-profiles/java-bdb-spawn.0-8.testprofile b/qpid/java/test-profiles/java-bdb-spawn.0-8.testprofile
index 62c9385835..6e21a35425 100644
--- a/qpid/java/test-profiles/java-bdb-spawn.0-8.testprofile
+++ b/qpid/java/test-profiles/java-bdb-spawn.0-8.testprofile
@@ -23,7 +23,7 @@ broker.command=build/bin/qpid-server -p @PORT -m @MPORT @EXCLUDES -c @CONFIG_FIL
broker.ready=BRK-1004
broker.stopped=Exception
broker.config=build/etc/config-systests-bdb.xml
-messagestorefactory.class.name=org.apache.qpid.server.store.berkeleydb.BDBMessageStoreFactory
+messagestore.class.name=org.apache.qpid.server.store.berkeleydb.BDBMessageStore
profile.excludes=JavaExcludes JavaPersistentExcludes XAExcludes JavaPre010Excludes JavaBDBExcludes
broker.clean.between.tests=true
broker.persistent=true
diff --git a/qpid/java/test-profiles/java-bdb-spawn.0-9-1.testprofile b/qpid/java/test-profiles/java-bdb-spawn.0-9-1.testprofile
index cfc2a12dde..491775d452 100644
--- a/qpid/java/test-profiles/java-bdb-spawn.0-9-1.testprofile
+++ b/qpid/java/test-profiles/java-bdb-spawn.0-9-1.testprofile
@@ -23,7 +23,7 @@ broker.command=build/bin/qpid-server -p @PORT -m @MPORT @EXCLUDES -c @CONFIG_FIL
broker.ready=BRK-1004
broker.stopped=Exception
broker.config=build/etc/config-systests-bdb.xml
-messagestorefactory.class.name=org.apache.qpid.server.store.berkeleydb.BDBMessageStoreFactory
+messagestore.class.name=org.apache.qpid.server.store.berkeleydb.BDBMessageStore
profile.excludes=JavaExcludes JavaPersistentExcludes XAExcludes JavaPre010Excludes JavaBDBExcludes
broker.clean.between.tests=true
broker.persistent=true
diff --git a/qpid/java/test-profiles/java-bdb-spawn.0-9.testprofile b/qpid/java/test-profiles/java-bdb-spawn.0-9.testprofile
index 9cfa25eb9a..6e4c6cbd13 100644
--- a/qpid/java/test-profiles/java-bdb-spawn.0-9.testprofile
+++ b/qpid/java/test-profiles/java-bdb-spawn.0-9.testprofile
@@ -23,7 +23,7 @@ broker.command=build/bin/qpid-server -p @PORT -m @MPORT @EXCLUDES -c @CONFIG_FIL
broker.ready=BRK-1004
broker.stopped=Exception
broker.config=build/etc/config-systests-bdb.xml
-messagestorefactory.class.name=org.apache.qpid.server.store.berkeleydb.BDBMessageStoreFactory
+messagestore.class.name=org.apache.qpid.server.store.berkeleydb.BDBMessageStore
profile.excludes=JavaExcludes JavaPersistentExcludes XAExcludes JavaPre010Excludes JavaBDBExcludes
broker.clean.between.tests=true
broker.persistent=true
diff --git a/qpid/java/test-profiles/java-bdb.0-10.testprofile b/qpid/java/test-profiles/java-bdb.0-10.testprofile
index 4ac4c3baf2..3ef93a68cb 100644
--- a/qpid/java/test-profiles/java-bdb.0-10.testprofile
+++ b/qpid/java/test-profiles/java-bdb.0-10.testprofile
@@ -24,7 +24,7 @@ broker.command=build/bin/qpid-server -p @PORT -m @MPORT -c @CONFIG_FILE -l test-
broker.ready=BRK-1004
broker.stopped=Exception
broker.config=build/etc/config-systests-bdb.xml
-messagestorefactory.class.name=org.apache.qpid.server.store.berkeleydb.BDBMessageStoreFactory
+messagestore.class.name=org.apache.qpid.server.store.berkeleydb.BDBMessageStore
profile.excludes=JavaExcludes JavaPersistentExcludes Java010Excludes JavaBDBExcludes
broker.clean.between.tests=true
broker.persistent=true
diff --git a/qpid/java/test-profiles/java-bdb.0-8.testprofile b/qpid/java/test-profiles/java-bdb.0-8.testprofile
index 76c3ea0b72..b20a5b39a7 100644
--- a/qpid/java/test-profiles/java-bdb.0-8.testprofile
+++ b/qpid/java/test-profiles/java-bdb.0-8.testprofile
@@ -24,7 +24,7 @@ broker.command=build/bin/qpid-server -p @PORT -m @MPORT @EXCLUDES -c @CONFIG_FIL
broker.ready=BRK-1004
broker.stopped=Exception
broker.config=build/etc/config-systests-bdb.xml
-messagestorefactory.class.name=org.apache.qpid.server.store.berkeleydb.BDBMessageStoreFactory
+messagestore.class.name=org.apache.qpid.server.store.berkeleydb.BDBMessageStore
profile.excludes=JavaExcludes JavaPersistentExcludes XAExcludes JavaPre010Excludes JavaBDBExcludes
broker.clean.between.tests=true
broker.persistent=true
diff --git a/qpid/java/test-profiles/java-bdb.0-9-1.testprofile b/qpid/java/test-profiles/java-bdb.0-9-1.testprofile
index afc5f7bfd1..111d8f7867 100644
--- a/qpid/java/test-profiles/java-bdb.0-9-1.testprofile
+++ b/qpid/java/test-profiles/java-bdb.0-9-1.testprofile
@@ -24,7 +24,7 @@ broker.command=build/bin/qpid-server -p @PORT -m @MPORT @EXCLUDES -c @CONFIG_FIL
broker.ready=BRK-1004
broker.stopped=Exception
broker.config=build/etc/config-systests-bdb.xml
-messagestorefactory.class.name=org.apache.qpid.server.store.berkeleydb.BDBMessageStoreFactory
+messagestore.class.name=org.apache.qpid.server.store.berkeleydb.BDBMessageStore
profile.excludes=JavaExcludes JavaPersistentExcludes XAExcludes JavaPre010Excludes JavaBDBExcludes
broker.clean.between.tests=true
broker.persistent=true
diff --git a/qpid/java/test-profiles/java-bdb.0-9.testprofile b/qpid/java/test-profiles/java-bdb.0-9.testprofile
index 76bde0defc..9524f59f02 100644
--- a/qpid/java/test-profiles/java-bdb.0-9.testprofile
+++ b/qpid/java/test-profiles/java-bdb.0-9.testprofile
@@ -24,7 +24,7 @@ broker.command=build/bin/qpid-server -p @PORT -m @MPORT @EXCLUDES -c @CONFIG_FIL
broker.ready=BRK-1004
broker.stopped=Exception
broker.config=build/etc/config-systests-bdb.xml
-messagestorefactory.class.name=org.apache.qpid.server.store.berkeleydb.BDBMessageStoreFactory
+messagestore.class.name=org.apache.qpid.server.store.berkeleydb.BDBMessageStore
profile.excludes=JavaExcludes JavaPersistentExcludes XAExcludes JavaPre010Excludes JavaBDBExcludes
broker.clean.between.tests=true
broker.persistent=true
diff --git a/qpid/java/test-profiles/java-dby-spawn.0-10.testprofile b/qpid/java/test-profiles/java-dby-spawn.0-10.testprofile
index 3b57dca346..906ea271e6 100644
--- a/qpid/java/test-profiles/java-dby-spawn.0-10.testprofile
+++ b/qpid/java/test-profiles/java-dby-spawn.0-10.testprofile
@@ -23,7 +23,7 @@ broker.command=build/bin/qpid-server -p @PORT -m @MPORT -c @CONFIG_FILE -l test-
broker.ready=BRK-1004
broker.stopped=Exception
broker.config=build/etc/config-systests-derby.xml
-messagestorefactory.class.name=org.apache.qpid.server.store.derby.DerbyMessageStoreFactory
+messagestore.class.name=org.apache.qpid.server.store.derby.DerbyMessageStore
profile.excludes=JavaPersistentExcludes JavaDerbyExcludes Java010Excludes
broker.clean.between.tests=true
broker.persistent=true
diff --git a/qpid/java/test-profiles/java-dby-spawn.0-8.testprofile b/qpid/java/test-profiles/java-dby-spawn.0-8.testprofile
index 9d421f706e..11670154a8 100644
--- a/qpid/java/test-profiles/java-dby-spawn.0-8.testprofile
+++ b/qpid/java/test-profiles/java-dby-spawn.0-8.testprofile
@@ -24,7 +24,7 @@ broker.ready=BRK-1004
broker.stopped=Exception
broker.config=build/etc/config-systests-derby.xml
broker.protocol.excludes=--exclude-0-10 @PORT --exclude-0-10 @SSL_PORT --exclude-1-0 @PORT --exclude-1-0 @SSL_PORT --exclude-0-9-1 @PORT --exclude-0-9-1 @SSL_PORT --exclude-0-9 @PORT --exclude-0-9 @SSL_PORT
-messagestorefactory.class.name=org.apache.qpid.server.store.derby.DerbyMessageStoreFactory
+messagestore.class.name=org.apache.qpid.server.store.derby.DerbyMessageStore
profile.excludes=JavaPersistentExcludes JavaDerbyExcludes XAExcludes JavaPre010Excludes
broker.clean.between.tests=true
broker.persistent=true
diff --git a/qpid/java/test-profiles/java-dby-spawn.0-9-1.testprofile b/qpid/java/test-profiles/java-dby-spawn.0-9-1.testprofile
index 3038dd324e..7d60916ab5 100644
--- a/qpid/java/test-profiles/java-dby-spawn.0-9-1.testprofile
+++ b/qpid/java/test-profiles/java-dby-spawn.0-9-1.testprofile
@@ -24,7 +24,7 @@ broker.ready=BRK-1004
broker.stopped=Exception
broker.config=build/etc/config-systests-derby.xml
broker.protocol.excludes=--exclude-0-10 @PORT --exclude-0-10 @SSL_PORT --exclude-1-0 @PORT --exclude-1-0 @SSL_PORT
-messagestorefactory.class.name=org.apache.qpid.server.store.derby.DerbyMessageStoreFactory
+messagestore.class.name=org.apache.qpid.server.store.derby.DerbyMessageStore
profile.excludes=JavaPersistentExcludes JavaDerbyExcludes XAExcludes JavaPre010Excludes
broker.clean.between.tests=true
broker.persistent=true
diff --git a/qpid/java/test-profiles/java-dby-spawn.0-9.testprofile b/qpid/java/test-profiles/java-dby-spawn.0-9.testprofile
index 6007105097..4eab65eab0 100644
--- a/qpid/java/test-profiles/java-dby-spawn.0-9.testprofile
+++ b/qpid/java/test-profiles/java-dby-spawn.0-9.testprofile
@@ -24,7 +24,7 @@ broker.ready=BRK-1004
broker.stopped=Exception
broker.config=build/etc/config-systests-derby.xml
broker.protocol.excludes=--exclude-0-10 @PORT --exclude-0-10 @SSL_PORT --exclude-1-0 @PORT --exclude-1-0 @SSL_PORT --exclude-0-9-1 @PORT --exclude-0-9-1 @SSL_PORT
-messagestorefactory.class.name=org.apache.qpid.server.store.derby.DerbyMessageStoreFactory
+messagestore.class.name=org.apache.qpid.server.store.derby.DerbyMessageStore
profile.excludes=JavaPersistentExcludes JavaDerbyExcludes XAExcludes JavaPre010Excludes
broker.clean.between.tests=true
broker.persistent=true
diff --git a/qpid/java/test-profiles/java-dby.0-10.testprofile b/qpid/java/test-profiles/java-dby.0-10.testprofile
index 51f6c4ca6f..9bc6caf8a5 100644
--- a/qpid/java/test-profiles/java-dby.0-10.testprofile
+++ b/qpid/java/test-profiles/java-dby.0-10.testprofile
@@ -24,7 +24,7 @@ broker.command=build/bin/qpid-server -p @PORT -m @MPORT -c @CONFIG_FILE -l test-
broker.ready=BRK-1004
broker.stopped=Exception
broker.config=build/etc/config-systests-derby.xml
-messagestorefactory.class.name=org.apache.qpid.server.store.derby.DerbyMessageStoreFactory
+messagestore.class.name=org.apache.qpid.server.store.derby.DerbyMessageStore
profile.excludes=JavaPersistentExcludes JavaDerbyExcludes Java010Excludes
broker.clean.between.tests=true
broker.persistent=true
diff --git a/qpid/java/test-profiles/java-dby.0-8.testprofile b/qpid/java/test-profiles/java-dby.0-8.testprofile
index c841c69922..8a0a57c202 100644
--- a/qpid/java/test-profiles/java-dby.0-8.testprofile
+++ b/qpid/java/test-profiles/java-dby.0-8.testprofile
@@ -25,7 +25,7 @@ broker.ready=BRK-1004
broker.stopped=Exception
broker.config=build/etc/config-systests-derby.xml
broker.protocol.excludes=--exclude-0-10 @PORT --exclude-0-10 @SSL_PORT --exclude-1-0 @PORT --exclude-1-0 @SSL_PORT --exclude-0-9-1 @PORT --exclude-0-9-1 @SSL_PORT --exclude-0-9 @PORT --exclude-0-9 @SSL_PORT
-messagestorefactory.class.name=org.apache.qpid.server.store.derby.DerbyMessageStoreFactory
+messagestore.class.name=org.apache.qpid.server.store.derby.DerbyMessageStore
profile.excludes=JavaPersistentExcludes JavaDerbyExcludes XAExcludes JavaPre010Excludes
broker.clean.between.tests=true
broker.persistent=true
diff --git a/qpid/java/test-profiles/java-dby.0-9-1.testprofile b/qpid/java/test-profiles/java-dby.0-9-1.testprofile
index fa01010d52..1208949805 100644
--- a/qpid/java/test-profiles/java-dby.0-9-1.testprofile
+++ b/qpid/java/test-profiles/java-dby.0-9-1.testprofile
@@ -25,7 +25,7 @@ broker.ready=BRK-1004
broker.stopped=Exception
broker.config=build/etc/config-systests-derby.xml
broker.protocol.excludes=--exclude-0-10 @PORT --exclude-0-10 @SSL_PORT --exclude-1-0 @PORT --exclude-1-0 @SSL_PORT
-messagestorefactory.class.name=org.apache.qpid.server.store.derby.DerbyMessageStoreFactory
+messagestore.class.name=org.apache.qpid.server.store.derby.DerbyMessageStore
profile.excludes=JavaPersistentExcludes JavaDerbyExcludes XAExcludes JavaPre010Excludes
broker.clean.between.tests=true
broker.persistent=true
diff --git a/qpid/java/test-profiles/java-dby.0-9.testprofile b/qpid/java/test-profiles/java-dby.0-9.testprofile
index d343185591..9cf1347c4d 100644
--- a/qpid/java/test-profiles/java-dby.0-9.testprofile
+++ b/qpid/java/test-profiles/java-dby.0-9.testprofile
@@ -25,7 +25,7 @@ broker.ready=BRK-1004
broker.stopped=Exception
broker.config=build/etc/config-systests-derby.xml
broker.protocol.excludes=--exclude-0-10 @PORT --exclude-0-10 @SSL_PORT --exclude-1-0 @PORT --exclude-1-0 @SSL_PORT --exclude-0-9-1 @PORT --exclude-0-9-1 @SSL_PORT
-messagestorefactory.class.name=org.apache.qpid.server.store.derby.DerbyMessageStoreFactory
+messagestore.class.name=org.apache.qpid.server.store.derby.DerbyMessageStore
profile.excludes=JavaPersistentExcludes JavaDerbyExcludes XAExcludes JavaPre010Excludes
broker.clean.between.tests=true
broker.persistent=true
diff --git a/qpid/java/test-profiles/testprofile.defaults b/qpid/java/test-profiles/testprofile.defaults
index 2c3c92e922..b0c1aea661 100644
--- a/qpid/java/test-profiles/testprofile.defaults
+++ b/qpid/java/test-profiles/testprofile.defaults
@@ -21,7 +21,7 @@ java.naming.provider.url=${test.profiles}/test-provider.properties
broker.ready=Listening on TCP
broker.config=build/etc/config-systests.xml
-messagestorefactory.class.name=org.apache.qpid.server.store.MemoryMessageStoreFactory
+messagestore.class.name=org.apache.qpid.server.store.MemoryMessageStore
broker.protocol.excludes=
broker.persistent=false