summaryrefslogtreecommitdiff
path: root/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java')
-rw-r--r--qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java144
1 files changed, 49 insertions, 95 deletions
diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java
index 353c3a0ec5..4efe1967ce 100644
--- a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java
+++ b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java
@@ -19,6 +19,7 @@
*/
package org.apache.qpid.server.store.berkeleydb;
+import java.io.File;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.HashMap;
@@ -26,7 +27,6 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.Callable;
@@ -38,15 +38,19 @@ import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
-import org.apache.commons.configuration.XMLConfiguration;
import org.apache.commons.lang.StringUtils;
import org.apache.log4j.Logger;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQConnectionURL;
-import org.apache.qpid.test.utils.TestBrokerConfiguration;
+import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacadeFactory;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.apache.qpid.test.utils.TestBrokerConfiguration;
import org.apache.qpid.url.URLSyntaxException;
+import com.sleepycat.je.rep.ReplicationConfig;
+
public class HATestClusterCreator
{
protected static final Logger LOGGER = Logger.getLogger(HATestClusterCreator.class);
@@ -65,17 +69,14 @@ public class HATestClusterCreator
private static final int CONNECTDELAY = 75;
private final QpidBrokerTestCase _testcase;
- private final Map<Integer, Integer> _brokerPortToBdbPortMap = new HashMap<Integer, Integer>();
- private final Map<Integer, BrokerConfigHolder> _brokerConfigurations = new TreeMap<Integer, BrokerConfigHolder>();
+ private final Map<Integer, Integer> _brokerPortToBdbPortMap = new TreeMap<Integer, Integer>();
private final String _virtualHostName;
- private final String _vhostStoreConfigKeyPrefix;
private final String _ipAddressOfBroker;
private final String _groupName ;
private final int _numberOfNodes;
private int _bdbHelperPort;
private int _primaryBrokerPort;
- private String _vhostConfigKeyPrefix;
public HATestClusterCreator(QpidBrokerTestCase testcase, String virtualHostName, int numberOfNodes)
{
@@ -84,8 +85,6 @@ public class HATestClusterCreator
_groupName = "group" + _testcase.getName();
_ipAddressOfBroker = getIpAddressOfBrokerHost();
_numberOfNodes = numberOfNodes;
- _vhostConfigKeyPrefix = "virtualhosts.virtualhost." + _virtualHostName + ".";
- _vhostStoreConfigKeyPrefix = _vhostConfigKeyPrefix + "store.";
_bdbHelperPort = 0;
}
@@ -104,13 +103,26 @@ public class HATestClusterCreator
_bdbHelperPort = bdbPort;
}
- configureClusterNode(brokerPort, bdbPort);
+ String nodeName = getNodeNameForNodeAt(bdbPort);
+ Map<String, Object> messageStoreSettings = new HashMap<String, Object>();
+ messageStoreSettings.put(MessageStore.STORE_PATH, System.getProperty("QPID_WORK") + File.separator + brokerPort);
+ messageStoreSettings.put(ReplicatedEnvironmentFacadeFactory.GROUP_NAME, _groupName);
+ messageStoreSettings.put(ReplicatedEnvironmentFacadeFactory.NODE_NAME, nodeName);
+ messageStoreSettings.put(ReplicatedEnvironmentFacadeFactory.NODE_ADDRESS, getNodeHostPortForNodeAt(bdbPort));
+ messageStoreSettings.put(ReplicatedEnvironmentFacadeFactory.HELPER_ADDRESS, getHelperHostPort());
+ Map<String, String> repSettings = new HashMap<String, String>();
+ repSettings.put(ReplicationConfig.INSUFFICIENT_REPLICAS_TIMEOUT, "2 s");
+ repSettings.put(ReplicationConfig.ELECTIONS_PRIMARY_RETRIES, "0");
+ messageStoreSettings.put(ReplicatedEnvironmentFacadeFactory.REPLICATION_CONFIG, repSettings );
+
TestBrokerConfiguration brokerConfiguration = _testcase.getBrokerConfiguration(brokerPort);
brokerConfiguration.addJmxManagementConfiguration();
- collectConfig(brokerPort, brokerConfiguration, _testcase.getTestVirtualhosts());
+ brokerConfiguration.setObjectAttribute(_virtualHostName, VirtualHost.TYPE, BDBHAVirtualHostFactory.TYPE);
+ brokerConfiguration.setObjectAttribute(_virtualHostName, VirtualHost.MESSAGE_STORE_SETTINGS, messageStoreSettings);
brokerPort = _testcase.getNextAvailable(bdbPort + 1);
}
+ _primaryBrokerPort = getPrimaryBrokerPort();
}
public void setDesignatedPrimaryOnFirstBroker(boolean designatedPrimary) throws Exception
@@ -119,35 +131,27 @@ public class HATestClusterCreator
{
throw new IllegalArgumentException("Only two nodes groups have the concept of primary");
}
-
- final Entry<Integer, BrokerConfigHolder> brokerConfigEntry = _brokerConfigurations.entrySet().iterator().next();
- final String configKey = getConfigKey("highAvailability.designatedPrimary");
- brokerConfigEntry.getValue().getTestVirtualhosts().setProperty(configKey, Boolean.toString(designatedPrimary));
- _primaryBrokerPort = brokerConfigEntry.getKey();
+ TestBrokerConfiguration config = _testcase.getBrokerConfiguration(_primaryBrokerPort);
+ @SuppressWarnings("unchecked")
+ Map<String, Object> storeSetting = (Map<String, Object>) config.getObjectAttributes(_virtualHostName).get(VirtualHost.MESSAGE_STORE_SETTINGS);
+ storeSetting.put(ReplicatedEnvironmentFacadeFactory.DESIGNATED_PRIMARY, designatedPrimary);
+ config.setObjectAttribute(_virtualHostName, VirtualHost.MESSAGE_STORE_SETTINGS, storeSetting);
+ config.setSaved(false);
}
- /**
- * @param configKeySuffix "highAvailability.designatedPrimary", for example
- * @return "virtualhost.test.store.highAvailability.designatedPrimary", for example
- */
- private String getConfigKey(String configKeySuffix)
+ private int getPrimaryBrokerPort()
{
- final String configKey = StringUtils.substringAfter(_vhostStoreConfigKeyPrefix + configKeySuffix, "virtualhosts.");
- return configKey;
+ return _brokerPortToBdbPortMap.keySet().iterator().next();
}
public void startNode(final int brokerPortNumber) throws Exception
{
- final BrokerConfigHolder brokerConfigHolder = _brokerConfigurations.get(brokerPortNumber);
-
- _testcase.setTestVirtualhosts(brokerConfigHolder.getTestVirtualhosts());
-
_testcase.startBroker(brokerPortNumber);
}
public void startCluster() throws Exception
{
- for (final Integer brokerPortNumber : _brokerConfigurations.keySet())
+ for (final Integer brokerPortNumber : _brokerPortToBdbPortMap.keySet())
{
startNode(brokerPortNumber);
}
@@ -155,21 +159,20 @@ public class HATestClusterCreator
public void startClusterParallel() throws Exception
{
- final ExecutorService executor = Executors.newFixedThreadPool(_brokerConfigurations.size());
+ final ExecutorService executor = Executors.newFixedThreadPool(_brokerPortToBdbPortMap.size());
try
{
List<Future<Object>> brokers = new CopyOnWriteArrayList<Future<Object>>();
- for (final Integer brokerPortNumber : _brokerConfigurations.keySet())
+ for (final Integer brokerPortNumber : _brokerPortToBdbPortMap.keySet())
{
- final BrokerConfigHolder brokerConfigHolder = _brokerConfigurations.get(brokerPortNumber);
+ final TestBrokerConfiguration brokerConfig = _testcase.getBrokerConfiguration(brokerPortNumber);
Future<Object> future = executor.submit(new Callable<Object>()
{
public Object call()
{
try
{
- _testcase.startBroker(brokerPortNumber, brokerConfigHolder.getTestConfiguration(),
- brokerConfigHolder.getTestVirtualhosts());
+ _testcase.startBroker(brokerPortNumber, brokerConfig);
return "OK";
}
catch (Exception e)
@@ -213,7 +216,7 @@ public class HATestClusterCreator
public void stopCluster() throws Exception
{
- for (final Integer brokerPortNumber : _brokerConfigurations.keySet())
+ for (final Integer brokerPortNumber : _brokerPortToBdbPortMap.keySet())
{
try
{
@@ -345,22 +348,9 @@ public class HATestClusterCreator
public Set<Integer> getBrokerPortNumbersForNodes()
{
- return new HashSet<Integer>(_brokerConfigurations.keySet());
+ return new HashSet<Integer>(_brokerPortToBdbPortMap.keySet());
}
- private void configureClusterNode(final int brokerPort, final int bdbPort) throws Exception
- {
- final String nodeName = getNodeNameForNodeAt(bdbPort);
-
-
- _testcase.setVirtualHostConfigurationProperty(_vhostConfigKeyPrefix + "type", BDBHAVirtualHostFactory.TYPE);
- _testcase.setVirtualHostConfigurationProperty(_vhostStoreConfigKeyPrefix + "class", "org.apache.qpid.server.store.berkeleydb.BDBHAMessageStore");
-
- _testcase.setVirtualHostConfigurationProperty(_vhostStoreConfigKeyPrefix + "highAvailability.groupName", _groupName);
- _testcase.setVirtualHostConfigurationProperty(_vhostStoreConfigKeyPrefix + "highAvailability.nodeName", nodeName);
- _testcase.setVirtualHostConfigurationProperty(_vhostStoreConfigKeyPrefix + "highAvailability.nodeHostPort", getNodeHostPortForNodeAt(bdbPort));
- _testcase.setVirtualHostConfigurationProperty(_vhostStoreConfigKeyPrefix + "highAvailability.helperHostPort", getHelperHostPort());
- }
public String getIpAddressOfBrokerHost()
{
@@ -375,55 +365,19 @@ public class HATestClusterCreator
}
}
- private void collectConfig(final int brokerPortNumber, TestBrokerConfiguration testConfiguration, XMLConfiguration testVirtualhosts)
- {
- _brokerConfigurations.put(brokerPortNumber, new BrokerConfigHolder(testConfiguration,
- (XMLConfiguration) testVirtualhosts.clone()));
- }
-
- public class BrokerConfigHolder
- {
- private final TestBrokerConfiguration _testConfiguration;
- private final XMLConfiguration _testVirtualhosts;
-
- public BrokerConfigHolder(TestBrokerConfiguration testConfiguration, XMLConfiguration testVirtualhosts)
- {
- _testConfiguration = testConfiguration;
- _testVirtualhosts = testVirtualhosts;
- }
-
- public TestBrokerConfiguration getTestConfiguration()
- {
- return _testConfiguration;
- }
-
- public XMLConfiguration getTestVirtualhosts()
- {
- return _testVirtualhosts;
- }
- }
-
public void modifyClusterNodeBdbAddress(int brokerPortNumberToBeMoved, int newBdbPort)
{
- final BrokerConfigHolder brokerConfigHolder = _brokerConfigurations.get(brokerPortNumberToBeMoved);
- final XMLConfiguration virtualHostConfig = brokerConfigHolder.getTestVirtualhosts();
-
- final String configKey = getConfigKey("highAvailability.nodeHostPort");
- final String oldBdbHostPort = virtualHostConfig.getString(configKey);
-
- final String[] oldHostAndPort = StringUtils.split(oldBdbHostPort, ":");
- final String oldHost = oldHostAndPort[0];
-
- final String newBdbHostPort = oldHost + ":" + newBdbPort;
-
- virtualHostConfig.setProperty(configKey, newBdbHostPort);
- collectConfig(brokerPortNumberToBeMoved, brokerConfigHolder.getTestConfiguration(), virtualHostConfig);
+ TestBrokerConfiguration config = _testcase.getBrokerConfiguration(brokerPortNumberToBeMoved);
+
+ @SuppressWarnings("unchecked")
+ Map<String, Object> storeSetting = (Map<String, Object>) config.getObjectAttributes(_virtualHostName).get(VirtualHost.MESSAGE_STORE_SETTINGS);
+ String oldBdbHostPort = (String) storeSetting.get(ReplicatedEnvironmentFacadeFactory.NODE_ADDRESS);
+ String[] oldHostAndPort = StringUtils.split(oldBdbHostPort, ":");
+ String oldHost = oldHostAndPort[0];
+ String newBdbHostPort = oldHost + ":" + newBdbPort;
+ storeSetting.put(ReplicatedEnvironmentFacadeFactory.NODE_ADDRESS, newBdbHostPort);
+ config.setObjectAttribute(_virtualHostName, VirtualHost.MESSAGE_STORE_SETTINGS, storeSetting);
+ config.setSaved(false);
}
- public String getStoreConfigKeyPrefix()
- {
- return _vhostStoreConfigKeyPrefix;
- }
-
-
}