diff options
Diffstat (limited to 'qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java')
-rw-r--r-- | qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java | 144 |
1 files changed, 49 insertions, 95 deletions
diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java index 353c3a0ec5..4efe1967ce 100644 --- a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java +++ b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java @@ -19,6 +19,7 @@ */ package org.apache.qpid.server.store.berkeleydb; +import java.io.File; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.HashMap; @@ -26,7 +27,6 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Map.Entry; import java.util.Set; import java.util.TreeMap; import java.util.concurrent.Callable; @@ -38,15 +38,19 @@ import java.util.concurrent.TimeUnit; import javax.jms.Connection; -import org.apache.commons.configuration.XMLConfiguration; import org.apache.commons.lang.StringUtils; import org.apache.log4j.Logger; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQConnectionURL; -import org.apache.qpid.test.utils.TestBrokerConfiguration; +import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacadeFactory; import org.apache.qpid.test.utils.QpidBrokerTestCase; +import org.apache.qpid.test.utils.TestBrokerConfiguration; import org.apache.qpid.url.URLSyntaxException; +import com.sleepycat.je.rep.ReplicationConfig; + public class HATestClusterCreator { protected static final Logger LOGGER = Logger.getLogger(HATestClusterCreator.class); @@ -65,17 +69,14 @@ public class HATestClusterCreator private static final int CONNECTDELAY = 75; private final QpidBrokerTestCase _testcase; - private final Map<Integer, Integer> _brokerPortToBdbPortMap = new HashMap<Integer, Integer>(); - private final Map<Integer, BrokerConfigHolder> _brokerConfigurations = new TreeMap<Integer, BrokerConfigHolder>(); + private final Map<Integer, Integer> _brokerPortToBdbPortMap = new TreeMap<Integer, Integer>(); private final String _virtualHostName; - private final String _vhostStoreConfigKeyPrefix; private final String _ipAddressOfBroker; private final String _groupName ; private final int _numberOfNodes; private int _bdbHelperPort; private int _primaryBrokerPort; - private String _vhostConfigKeyPrefix; public HATestClusterCreator(QpidBrokerTestCase testcase, String virtualHostName, int numberOfNodes) { @@ -84,8 +85,6 @@ public class HATestClusterCreator _groupName = "group" + _testcase.getName(); _ipAddressOfBroker = getIpAddressOfBrokerHost(); _numberOfNodes = numberOfNodes; - _vhostConfigKeyPrefix = "virtualhosts.virtualhost." + _virtualHostName + "."; - _vhostStoreConfigKeyPrefix = _vhostConfigKeyPrefix + "store."; _bdbHelperPort = 0; } @@ -104,13 +103,26 @@ public class HATestClusterCreator _bdbHelperPort = bdbPort; } - configureClusterNode(brokerPort, bdbPort); + String nodeName = getNodeNameForNodeAt(bdbPort); + Map<String, Object> messageStoreSettings = new HashMap<String, Object>(); + messageStoreSettings.put(MessageStore.STORE_PATH, System.getProperty("QPID_WORK") + File.separator + brokerPort); + messageStoreSettings.put(ReplicatedEnvironmentFacadeFactory.GROUP_NAME, _groupName); + messageStoreSettings.put(ReplicatedEnvironmentFacadeFactory.NODE_NAME, nodeName); + messageStoreSettings.put(ReplicatedEnvironmentFacadeFactory.NODE_ADDRESS, getNodeHostPortForNodeAt(bdbPort)); + messageStoreSettings.put(ReplicatedEnvironmentFacadeFactory.HELPER_ADDRESS, getHelperHostPort()); + Map<String, String> repSettings = new HashMap<String, String>(); + repSettings.put(ReplicationConfig.INSUFFICIENT_REPLICAS_TIMEOUT, "2 s"); + repSettings.put(ReplicationConfig.ELECTIONS_PRIMARY_RETRIES, "0"); + messageStoreSettings.put(ReplicatedEnvironmentFacadeFactory.REPLICATION_CONFIG, repSettings ); + TestBrokerConfiguration brokerConfiguration = _testcase.getBrokerConfiguration(brokerPort); brokerConfiguration.addJmxManagementConfiguration(); - collectConfig(brokerPort, brokerConfiguration, _testcase.getTestVirtualhosts()); + brokerConfiguration.setObjectAttribute(_virtualHostName, VirtualHost.TYPE, BDBHAVirtualHostFactory.TYPE); + brokerConfiguration.setObjectAttribute(_virtualHostName, VirtualHost.MESSAGE_STORE_SETTINGS, messageStoreSettings); brokerPort = _testcase.getNextAvailable(bdbPort + 1); } + _primaryBrokerPort = getPrimaryBrokerPort(); } public void setDesignatedPrimaryOnFirstBroker(boolean designatedPrimary) throws Exception @@ -119,35 +131,27 @@ public class HATestClusterCreator { throw new IllegalArgumentException("Only two nodes groups have the concept of primary"); } - - final Entry<Integer, BrokerConfigHolder> brokerConfigEntry = _brokerConfigurations.entrySet().iterator().next(); - final String configKey = getConfigKey("highAvailability.designatedPrimary"); - brokerConfigEntry.getValue().getTestVirtualhosts().setProperty(configKey, Boolean.toString(designatedPrimary)); - _primaryBrokerPort = brokerConfigEntry.getKey(); + TestBrokerConfiguration config = _testcase.getBrokerConfiguration(_primaryBrokerPort); + @SuppressWarnings("unchecked") + Map<String, Object> storeSetting = (Map<String, Object>) config.getObjectAttributes(_virtualHostName).get(VirtualHost.MESSAGE_STORE_SETTINGS); + storeSetting.put(ReplicatedEnvironmentFacadeFactory.DESIGNATED_PRIMARY, designatedPrimary); + config.setObjectAttribute(_virtualHostName, VirtualHost.MESSAGE_STORE_SETTINGS, storeSetting); + config.setSaved(false); } - /** - * @param configKeySuffix "highAvailability.designatedPrimary", for example - * @return "virtualhost.test.store.highAvailability.designatedPrimary", for example - */ - private String getConfigKey(String configKeySuffix) + private int getPrimaryBrokerPort() { - final String configKey = StringUtils.substringAfter(_vhostStoreConfigKeyPrefix + configKeySuffix, "virtualhosts."); - return configKey; + return _brokerPortToBdbPortMap.keySet().iterator().next(); } public void startNode(final int brokerPortNumber) throws Exception { - final BrokerConfigHolder brokerConfigHolder = _brokerConfigurations.get(brokerPortNumber); - - _testcase.setTestVirtualhosts(brokerConfigHolder.getTestVirtualhosts()); - _testcase.startBroker(brokerPortNumber); } public void startCluster() throws Exception { - for (final Integer brokerPortNumber : _brokerConfigurations.keySet()) + for (final Integer brokerPortNumber : _brokerPortToBdbPortMap.keySet()) { startNode(brokerPortNumber); } @@ -155,21 +159,20 @@ public class HATestClusterCreator public void startClusterParallel() throws Exception { - final ExecutorService executor = Executors.newFixedThreadPool(_brokerConfigurations.size()); + final ExecutorService executor = Executors.newFixedThreadPool(_brokerPortToBdbPortMap.size()); try { List<Future<Object>> brokers = new CopyOnWriteArrayList<Future<Object>>(); - for (final Integer brokerPortNumber : _brokerConfigurations.keySet()) + for (final Integer brokerPortNumber : _brokerPortToBdbPortMap.keySet()) { - final BrokerConfigHolder brokerConfigHolder = _brokerConfigurations.get(brokerPortNumber); + final TestBrokerConfiguration brokerConfig = _testcase.getBrokerConfiguration(brokerPortNumber); Future<Object> future = executor.submit(new Callable<Object>() { public Object call() { try { - _testcase.startBroker(brokerPortNumber, brokerConfigHolder.getTestConfiguration(), - brokerConfigHolder.getTestVirtualhosts()); + _testcase.startBroker(brokerPortNumber, brokerConfig); return "OK"; } catch (Exception e) @@ -213,7 +216,7 @@ public class HATestClusterCreator public void stopCluster() throws Exception { - for (final Integer brokerPortNumber : _brokerConfigurations.keySet()) + for (final Integer brokerPortNumber : _brokerPortToBdbPortMap.keySet()) { try { @@ -345,22 +348,9 @@ public class HATestClusterCreator public Set<Integer> getBrokerPortNumbersForNodes() { - return new HashSet<Integer>(_brokerConfigurations.keySet()); + return new HashSet<Integer>(_brokerPortToBdbPortMap.keySet()); } - private void configureClusterNode(final int brokerPort, final int bdbPort) throws Exception - { - final String nodeName = getNodeNameForNodeAt(bdbPort); - - - _testcase.setVirtualHostConfigurationProperty(_vhostConfigKeyPrefix + "type", BDBHAVirtualHostFactory.TYPE); - _testcase.setVirtualHostConfigurationProperty(_vhostStoreConfigKeyPrefix + "class", "org.apache.qpid.server.store.berkeleydb.BDBHAMessageStore"); - - _testcase.setVirtualHostConfigurationProperty(_vhostStoreConfigKeyPrefix + "highAvailability.groupName", _groupName); - _testcase.setVirtualHostConfigurationProperty(_vhostStoreConfigKeyPrefix + "highAvailability.nodeName", nodeName); - _testcase.setVirtualHostConfigurationProperty(_vhostStoreConfigKeyPrefix + "highAvailability.nodeHostPort", getNodeHostPortForNodeAt(bdbPort)); - _testcase.setVirtualHostConfigurationProperty(_vhostStoreConfigKeyPrefix + "highAvailability.helperHostPort", getHelperHostPort()); - } public String getIpAddressOfBrokerHost() { @@ -375,55 +365,19 @@ public class HATestClusterCreator } } - private void collectConfig(final int brokerPortNumber, TestBrokerConfiguration testConfiguration, XMLConfiguration testVirtualhosts) - { - _brokerConfigurations.put(brokerPortNumber, new BrokerConfigHolder(testConfiguration, - (XMLConfiguration) testVirtualhosts.clone())); - } - - public class BrokerConfigHolder - { - private final TestBrokerConfiguration _testConfiguration; - private final XMLConfiguration _testVirtualhosts; - - public BrokerConfigHolder(TestBrokerConfiguration testConfiguration, XMLConfiguration testVirtualhosts) - { - _testConfiguration = testConfiguration; - _testVirtualhosts = testVirtualhosts; - } - - public TestBrokerConfiguration getTestConfiguration() - { - return _testConfiguration; - } - - public XMLConfiguration getTestVirtualhosts() - { - return _testVirtualhosts; - } - } - public void modifyClusterNodeBdbAddress(int brokerPortNumberToBeMoved, int newBdbPort) { - final BrokerConfigHolder brokerConfigHolder = _brokerConfigurations.get(brokerPortNumberToBeMoved); - final XMLConfiguration virtualHostConfig = brokerConfigHolder.getTestVirtualhosts(); - - final String configKey = getConfigKey("highAvailability.nodeHostPort"); - final String oldBdbHostPort = virtualHostConfig.getString(configKey); - - final String[] oldHostAndPort = StringUtils.split(oldBdbHostPort, ":"); - final String oldHost = oldHostAndPort[0]; - - final String newBdbHostPort = oldHost + ":" + newBdbPort; - - virtualHostConfig.setProperty(configKey, newBdbHostPort); - collectConfig(brokerPortNumberToBeMoved, brokerConfigHolder.getTestConfiguration(), virtualHostConfig); + TestBrokerConfiguration config = _testcase.getBrokerConfiguration(brokerPortNumberToBeMoved); + + @SuppressWarnings("unchecked") + Map<String, Object> storeSetting = (Map<String, Object>) config.getObjectAttributes(_virtualHostName).get(VirtualHost.MESSAGE_STORE_SETTINGS); + String oldBdbHostPort = (String) storeSetting.get(ReplicatedEnvironmentFacadeFactory.NODE_ADDRESS); + String[] oldHostAndPort = StringUtils.split(oldBdbHostPort, ":"); + String oldHost = oldHostAndPort[0]; + String newBdbHostPort = oldHost + ":" + newBdbPort; + storeSetting.put(ReplicatedEnvironmentFacadeFactory.NODE_ADDRESS, newBdbHostPort); + config.setObjectAttribute(_virtualHostName, VirtualHost.MESSAGE_STORE_SETTINGS, storeSetting); + config.setSaved(false); } - public String getStoreConfigKeyPrefix() - { - return _vhostStoreConfigKeyPrefix; - } - - } |