diff options
Diffstat (limited to 'qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/GroupCreator.java')
-rw-r--r-- | qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/GroupCreator.java | 535 |
1 files changed, 535 insertions, 0 deletions
diff --git a/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/GroupCreator.java b/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/GroupCreator.java new file mode 100644 index 0000000000..e78ef34759 --- /dev/null +++ b/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/GroupCreator.java @@ -0,0 +1,535 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb.replication; + +import java.io.File; +import java.io.IOException; +import java.io.StringWriter; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.Callable; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import javax.jms.Connection; + +import org.apache.commons.lang.StringUtils; +import org.apache.log4j.Logger; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQConnectionURL; +import org.apache.qpid.jms.ConnectionURL; +import org.apache.qpid.server.management.plugin.HttpManagement; +import org.apache.qpid.server.model.Plugin; +import org.apache.qpid.server.model.Port; +import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.model.VirtualHostNode; +import org.apache.qpid.server.virtualhost.berkeleydb.BDBHAVirtualHost; +import org.apache.qpid.server.virtualhost.berkeleydb.BDBHAVirtualHostImpl; +import org.apache.qpid.server.virtualhostnode.AbstractVirtualHostNode; +import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHARemoteReplicationNode; +import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHAVirtualHostNode; +import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHAVirtualHostNodeImpl; +import org.apache.qpid.systest.rest.RestTestHelper; +import org.apache.qpid.test.utils.QpidBrokerTestCase; +import org.apache.qpid.test.utils.TestBrokerConfiguration; +import org.apache.qpid.url.URLSyntaxException; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.map.SerializationConfig; +import org.junit.Assert; + +import com.sleepycat.je.rep.ReplicationConfig; + +public class GroupCreator +{ + protected static final Logger LOGGER = Logger.getLogger(GroupCreator.class); + + private static final String MANY_BROKER_URL_FORMAT = "amqp://guest:guest@/%s?brokerlist='%s'&failover='roundrobin?cyclecount='%d''"; + private static final String BROKER_PORTION_FORMAT = "tcp://localhost:%d?connectdelay='%d',retries='%d'"; + + private static final int FAILOVER_CYCLECOUNT = 10; + private static final int FAILOVER_RETRIES = 1; + private static final int FAILOVER_CONNECTDELAY = 1000; + + private static final String SINGLE_BROKER_URL_WITH_RETRY_FORMAT = "amqp://guest:guest@/%s?brokerlist='tcp://localhost:%d?connectdelay='%d',retries='%d''"; + private static final String SINGLE_BROKER_URL_WITHOUT_RETRY_FORMAT = "amqp://guest:guest@/%s?brokerlist='tcp://localhost:%d'"; + + private static final int RETRIES = 60; + private static final int CONNECTDELAY = 75; + + private final QpidBrokerTestCase _testcase; + private final Map<Integer, Integer> _brokerPortToBdbPortMap = new TreeMap<Integer, Integer>(); + private final String _virtualHostName; + + private final String _ipAddressOfBroker; + private final String _groupName ; + private final int _numberOfNodes; + private int _bdbHelperPort; + private int _primaryBrokerPort; + + public GroupCreator(QpidBrokerTestCase testcase, String virtualHostName, int numberOfNodes) + { + _testcase = testcase; + _virtualHostName = virtualHostName; + _groupName = virtualHostName; + _ipAddressOfBroker = getIpAddressOfBrokerHost(); + _numberOfNodes = numberOfNodes; + _bdbHelperPort = 0; + } + + public void configureClusterNodes() throws Exception + { + int brokerPort = _testcase.findFreePort(); + + int[] bdbPorts = new int[_numberOfNodes]; + for (int i = 0; i < _numberOfNodes; i++) + { + int bdbPort = _testcase.getNextAvailable(brokerPort + 1); + bdbPorts[i] = bdbPort; + _brokerPortToBdbPortMap.put(brokerPort, bdbPort); + brokerPort = _testcase.getNextAvailable(bdbPort + 1); + } + + String bluePrintJson = getBlueprint(_ipAddressOfBroker, bdbPorts); + + String helperName = null; + for (Map.Entry<Integer,Integer> entry: _brokerPortToBdbPortMap.entrySet()) + { + brokerPort = entry.getKey(); + int bdbPort = entry.getValue(); + LOGGER.debug("Cluster broker port " + brokerPort + ", bdb replication port " + bdbPort); + if (_bdbHelperPort == 0) + { + _bdbHelperPort = bdbPort; + } + + String nodeName = getNodeNameForNodeAt(bdbPort); + if (helperName == null) + { + helperName = nodeName; + } + + Map<String, Object> virtualHostNodeAttributes = new HashMap<String, Object>(); + virtualHostNodeAttributes.put(BDBHAVirtualHostNode.STORE_PATH, System.getProperty("QPID_WORK") + File.separator + brokerPort); + virtualHostNodeAttributes.put(BDBHAVirtualHostNode.GROUP_NAME, _groupName); + virtualHostNodeAttributes.put(BDBHAVirtualHostNode.NAME, nodeName); + virtualHostNodeAttributes.put(BDBHAVirtualHostNode.ADDRESS, getNodeHostPortForNodeAt(bdbPort)); + virtualHostNodeAttributes.put(BDBHAVirtualHostNode.HELPER_ADDRESS, getHelperHostPort()); + virtualHostNodeAttributes.put(BDBHAVirtualHostNode.TYPE, BDBHAVirtualHostNodeImpl.VIRTUAL_HOST_NODE_TYPE); + virtualHostNodeAttributes.put(BDBHAVirtualHostNode.HELPER_NODE_NAME, helperName); + + Map<String, String> context = new HashMap<>(); + context.put(ReplicationConfig.INSUFFICIENT_REPLICAS_TIMEOUT, "2 s"); + context.put(ReplicationConfig.ELECTIONS_PRIMARY_RETRIES, "0"); + context.put(AbstractVirtualHostNode.VIRTUALHOST_BLUEPRINT_CONTEXT_VAR, bluePrintJson); + virtualHostNodeAttributes.put(BDBHAVirtualHostNode.CONTEXT, context); + + TestBrokerConfiguration brokerConfiguration = _testcase.getBrokerConfiguration(brokerPort); + brokerConfiguration.addJmxManagementConfiguration(); + brokerConfiguration.addHttpManagementConfiguration(); + brokerConfiguration.setObjectAttribute(Plugin.class, TestBrokerConfiguration.ENTRY_NAME_HTTP_MANAGEMENT, HttpManagement.HTTP_BASIC_AUTHENTICATION_ENABLED, true); + brokerConfiguration.setObjectAttribute(Port.class, TestBrokerConfiguration.ENTRY_NAME_HTTP_PORT, Port.PORT, _testcase.getHttpManagementPort(brokerPort)); + brokerConfiguration.setObjectAttributes(VirtualHostNode.class, _virtualHostName, virtualHostNodeAttributes); + + } + _primaryBrokerPort = getPrimaryBrokerPort(); + } + + public void setDesignatedPrimaryOnFirstBroker(boolean designatedPrimary) throws Exception + { + if (_numberOfNodes != 2) + { + throw new IllegalArgumentException("Only two nodes groups have the concept of primary"); + } + TestBrokerConfiguration config = _testcase.getBrokerConfiguration(_primaryBrokerPort); + String nodeName = getNodeNameForNodeAt(_brokerPortToBdbPortMap.get(_primaryBrokerPort)); + config.setObjectAttribute(VirtualHostNode.class, nodeName, BDBHAVirtualHostNode.DESIGNATED_PRIMARY, designatedPrimary); + config.setSaved(false); + } + + private int getPrimaryBrokerPort() + { + return _brokerPortToBdbPortMap.keySet().iterator().next(); + } + + public void startNode(final int brokerPortNumber) throws Exception + { + _testcase.startBroker(brokerPortNumber); + } + + public void startCluster() throws Exception + { + for (final Integer brokerPortNumber : _brokerPortToBdbPortMap.keySet()) + { + startNode(brokerPortNumber); + } + } + + public void startClusterParallel() throws Exception + { + final ExecutorService executor = Executors.newFixedThreadPool(_brokerPortToBdbPortMap.size()); + try + { + List<Future<Object>> brokers = new CopyOnWriteArrayList<Future<Object>>(); + for (final Integer brokerPortNumber : _brokerPortToBdbPortMap.keySet()) + { + final TestBrokerConfiguration brokerConfig = _testcase.getBrokerConfiguration(brokerPortNumber); + Future<Object> future = executor.submit(new Callable<Object>() + { + public Object call() + { + try + { + _testcase.startBroker(brokerPortNumber, brokerConfig); + return "OK"; + } + catch (Exception e) + { + return e; + } + } + }); + brokers.add(future); + } + for (Future<Object> future : brokers) + { + Object result = future.get(30, TimeUnit.SECONDS); + LOGGER.debug("Node startup result:" + result); + if (result instanceof Exception) + { + throw (Exception) result; + } + else if (!"OK".equals(result)) + { + throw new Exception("One of the cluster nodes is not started"); + } + } + } + catch (Exception e) + { + stopCluster(); + throw e; + } + finally + { + executor.shutdown(); + } + + } + + public void stopNode(final int brokerPortNumber) + { + _testcase.killBroker(brokerPortNumber); + } + + public void stopCluster() throws Exception + { + for (final Integer brokerPortNumber : _brokerPortToBdbPortMap.keySet()) + { + try + { + stopNode(brokerPortNumber); + } + catch(Exception e) + { + LOGGER.warn("Failed to stop node on port:" + brokerPortNumber); + } + } + } + + public int getBrokerPortNumberFromConnection(Connection connection) + { + final AMQConnection amqConnection = (AMQConnection)connection; + return amqConnection.getActiveBrokerDetails().getPort(); + } + + public int getPortNumberOfAnInactiveBroker(final Connection activeConnection) + { + final Set<Integer> allBrokerPorts = _testcase.getBrokerPortNumbers(); + LOGGER.debug("Broker ports:" + allBrokerPorts); + final int activeBrokerPort = getBrokerPortNumberFromConnection(activeConnection); + allBrokerPorts.remove(activeBrokerPort); + LOGGER.debug("Broker ports:" + allBrokerPorts); + final int inactiveBrokerPort = allBrokerPorts.iterator().next(); + return inactiveBrokerPort; + } + + public int getBdbPortForBrokerPort(final int brokerPortNumber) + { + return _brokerPortToBdbPortMap.get(brokerPortNumber); + } + + public Set<Integer> getBdbPortNumbers() + { + return new HashSet<Integer>(_brokerPortToBdbPortMap.values()); + } + + public ConnectionURL getConnectionUrlForAllClusterNodes() throws Exception + { + return getConnectionUrlForAllClusterNodes(FAILOVER_CONNECTDELAY, FAILOVER_RETRIES, FAILOVER_CYCLECOUNT); + } + + public ConnectionURL getConnectionUrlForAllClusterNodes(int connectDelay, int retries, final int cyclecount) throws Exception + { + final StringBuilder brokerList = new StringBuilder(); + + for(Iterator<Integer> itr = _brokerPortToBdbPortMap.keySet().iterator(); itr.hasNext(); ) + { + int brokerPortNumber = itr.next(); + + brokerList.append(String.format(BROKER_PORTION_FORMAT, brokerPortNumber, connectDelay, retries)); + if (itr.hasNext()) + { + brokerList.append(";"); + } + } + + return new AMQConnectionURL(String.format(MANY_BROKER_URL_FORMAT, _virtualHostName, brokerList, cyclecount)); + } + + public AMQConnectionURL getConnectionUrlForSingleNodeWithoutRetry(final int brokerPortNumber) throws URLSyntaxException + { + return getConnectionUrlForSingleNode(brokerPortNumber, false); + } + + public AMQConnectionURL getConnectionUrlForSingleNodeWithRetry(final int brokerPortNumber) throws URLSyntaxException + { + return getConnectionUrlForSingleNode(brokerPortNumber, true); + } + + private AMQConnectionURL getConnectionUrlForSingleNode(final int brokerPortNumber, boolean retryAllowed) throws URLSyntaxException + { + final String url; + if (retryAllowed) + { + url = String.format(SINGLE_BROKER_URL_WITH_RETRY_FORMAT, _virtualHostName, brokerPortNumber, CONNECTDELAY, RETRIES); + } + else + { + url = String.format(SINGLE_BROKER_URL_WITHOUT_RETRY_FORMAT, _virtualHostName, brokerPortNumber); + } + + return new AMQConnectionURL(url); + } + + public String getGroupName() + { + return _groupName; + } + + public String getNodeNameForNodeAt(final int bdbPort) + { + return "node" + _testcase.getName() + bdbPort; + } + + public String getNodeHostPortForNodeAt(final int bdbPort) + { + return _ipAddressOfBroker + ":" + bdbPort; + } + + public String getHelperHostPort() + { + if (_bdbHelperPort == 0) + { + throw new IllegalStateException("Helper port not yet assigned."); + } + + return _ipAddressOfBroker + ":" + _bdbHelperPort; + } + + public void setHelperHostPort(int bdbHelperPort) + { + _bdbHelperPort = bdbHelperPort; + } + + public int getBrokerPortNumberOfPrimary() + { + if (_numberOfNodes != 2) + { + throw new IllegalArgumentException("Only two nodes groups have the concept of primary"); + } + + return _primaryBrokerPort; + } + + public int getBrokerPortNumberOfSecondaryNode() + { + final Set<Integer> portNumbers = getBrokerPortNumbersForNodes(); + portNumbers.remove(getBrokerPortNumberOfPrimary()); + return portNumbers.iterator().next(); + } + + public Set<Integer> getBrokerPortNumbersForNodes() + { + return new HashSet<Integer>(_brokerPortToBdbPortMap.keySet()); + } + + + public String getIpAddressOfBrokerHost() + { + String brokerHost = _testcase.getBroker().getHost(); + try + { + return InetAddress.getByName(brokerHost).getHostAddress(); + } + catch (UnknownHostException e) + { + throw new RuntimeException("Could not determine IP address of host : " + brokerHost, e); + } + } + + public void modifyClusterNodeBdbAddress(int brokerPortNumberToBeMoved, int newBdbPort) + { + TestBrokerConfiguration config = _testcase.getBrokerConfiguration(brokerPortNumberToBeMoved); + String nodeName = getNodeNameForNodeAt(_brokerPortToBdbPortMap.get(brokerPortNumberToBeMoved)); + + Map<String, Object> objectAttributes = config.getObjectAttributes(VirtualHostNode.class, nodeName); + + String oldBdbHostPort = (String)objectAttributes.get(BDBHAVirtualHostNode.ADDRESS); + String[] oldHostAndPort = StringUtils.split(oldBdbHostPort, ":"); + String oldHost = oldHostAndPort[0]; + String newBdbHostPort = oldHost + ":" + newBdbPort; + config.setObjectAttribute(VirtualHostNode.class, nodeName, BDBHAVirtualHostNode.ADDRESS, newBdbHostPort); + config.setSaved(false); + } + + public String getNodeNameForBrokerPort(final int brokerPort) + { + return getNodeNameForNodeAt(_brokerPortToBdbPortMap.get(brokerPort)); + } + + public void setNodeAttributes(int brokerPort, Map<String, Object> attributeMap) + throws Exception + { + setNodeAttributes(brokerPort, brokerPort, attributeMap); + } + + public void setNodeAttributes(int localNodePort, int remoteNodePort, Map<String, Object> attributeMap) + throws Exception + { + RestTestHelper restHelper = createRestTestHelper(localNodePort); + String url = getNodeRestUrl(localNodePort, remoteNodePort); + int status = restHelper.submitRequest(url, "PUT", attributeMap); + if (status != 200) + { + throw new Exception("Unexpected http status when updating " + getNodeNameForBrokerPort(remoteNodePort) + " attribute(s) : " + status); + } + } + + private String getNodeRestUrl(int localNodePort, int remoteNodePort) + { + String remoteNodeName = getNodeNameForBrokerPort(remoteNodePort); + String localNodeName = getNodeNameForBrokerPort(localNodePort); + String url = null; + if (localNodePort == remoteNodePort) + { + url = "/api/latest/virtualhostnode/" + localNodeName; + } + else + { + url = "/api/latest/replicationnode/" + localNodeName + "/" + remoteNodeName; + } + return url; + } + + public Map<String, Object> getNodeAttributes(int brokerPort) throws IOException + { + return getNodeAttributes(brokerPort, brokerPort); + } + + public Map<String, Object> getNodeAttributes(int localNodePort, int remoteNodePort) throws IOException + { + RestTestHelper restHelper = createRestTestHelper(localNodePort); + List<Map<String, Object>> results= restHelper.getJsonAsList(getNodeRestUrl(localNodePort, remoteNodePort)); + int size = results.size(); + if (size == 0) + { + return Collections.emptyMap(); + } + else if (size == 1) + { + return results.get(0); + } + else + { + throw new RuntimeException("Unexpected number of nodes " + size); + } + } + + public void awaitNodeToAttainRole(int brokerPort, String desiredRole) throws Exception + { + awaitNodeToAttainRole(brokerPort, brokerPort, desiredRole); + } + + public void awaitNodeToAttainRole(int localNodePort, int remoteNodePort, String desiredRole) throws Exception + { + final long startTime = System.currentTimeMillis(); + Map<String, Object> data = Collections.emptyMap(); + + while(!desiredRole.equals(data.get(BDBHARemoteReplicationNode.ROLE)) && (System.currentTimeMillis() - startTime) < 30000) + { + LOGGER.debug("Awaiting node '" + getNodeNameForBrokerPort(remoteNodePort) + "' to transit into " + desiredRole + " role"); + data = getNodeAttributes(localNodePort, remoteNodePort); + if (!desiredRole.equals(data.get(BDBHARemoteReplicationNode.ROLE))) + { + Thread.sleep(1000); + } + } + LOGGER.debug("Node '" + getNodeNameForBrokerPort(remoteNodePort) + "' role is " + data.get(BDBHARemoteReplicationNode.ROLE)); + Assert.assertEquals("Node is in unexpected role", desiredRole, data.get(BDBHARemoteReplicationNode.ROLE)); + } + + public RestTestHelper createRestTestHelper(int brokerPort) + { + int httpPort = _testcase.getHttpManagementPort(brokerPort); + RestTestHelper helper = new RestTestHelper(httpPort); + helper.setUsernameAndPassword("webadmin", "webadmin"); + return helper; + } + + public static String getBlueprint(String hostName, int... ports) throws Exception + { + List<String> permittedNodes = new ArrayList<String>(); + for (int port:ports) + { + permittedNodes.add(hostName + ":" + port); + } + Map<String,Object> bluePrint = new HashMap<>(); + bluePrint.put(VirtualHost.TYPE, BDBHAVirtualHostImpl.VIRTUAL_HOST_TYPE); + bluePrint.put(BDBHAVirtualHost.PERMITTED_NODES, permittedNodes); + + StringWriter writer = new StringWriter(); + ObjectMapper mapper = new ObjectMapper(); + mapper.configure(SerializationConfig.Feature.INDENT_OUTPUT, true); + mapper.writeValue(writer, bluePrint); + return writer.toString(); + } +} |