diff options
Diffstat (limited to 'qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication')
6 files changed, 2033 insertions, 0 deletions
diff --git a/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/BDBHAVirtualHostNodeRestTest.java b/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/BDBHAVirtualHostNodeRestTest.java new file mode 100644 index 0000000000..301375d0fb --- /dev/null +++ b/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/BDBHAVirtualHostNodeRestTest.java @@ -0,0 +1,455 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb.replication; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import javax.servlet.http.HttpServletResponse; + +import com.sleepycat.je.Durability; +import com.sleepycat.je.EnvironmentConfig; +import com.sleepycat.je.rep.ReplicatedEnvironment; +import com.sleepycat.je.rep.ReplicationConfig; +import org.apache.qpid.server.model.RemoteReplicationNode; +import org.apache.qpid.server.model.State; +import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.model.VirtualHostNode; +import org.apache.qpid.server.virtualhost.berkeleydb.BDBHAVirtualHost; +import org.apache.qpid.server.virtualhostnode.AbstractVirtualHostNode; +import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHARemoteReplicationNode; +import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHAVirtualHostNode; +import org.apache.qpid.systest.rest.Asserts; +import org.apache.qpid.systest.rest.QpidRestTestCase; +import org.apache.qpid.test.utils.TestBrokerConfiguration; + +public class BDBHAVirtualHostNodeRestTest extends QpidRestTestCase +{ + private static final String NODE1 = "node1"; + private static final String NODE2 = "node2"; + private static final String NODE3 = "node3"; + + private int _node1HaPort; + private int _node2HaPort; + private int _node3HaPort; + + private String _hostName; + private String _baseNodeRestUrl; + + @Override + public void setUp() throws Exception + { + setTestSystemProperty(ReplicatedEnvironmentFacade.REMOTE_NODE_MONITOR_INTERVAL_PROPERTY_NAME, "1000"); + + super.setUp(); + _hostName = getTestName(); + _baseNodeRestUrl = "virtualhostnode/"; + + _node1HaPort = findFreePort(); + _node2HaPort = getNextAvailable(_node1HaPort + 1); + _node3HaPort = getNextAvailable(_node2HaPort + 1); + + + } + + @Override + protected void customizeConfiguration() throws IOException + { + super.customizeConfiguration(); + TestBrokerConfiguration config = getBrokerConfiguration(); + config.removeObjectConfiguration(VirtualHostNode.class, TEST2_VIRTUALHOST); + config.removeObjectConfiguration(VirtualHostNode.class, TEST3_VIRTUALHOST); + } + + public void testCreate3NodeGroup() throws Exception + { + createHANode(NODE1, _node1HaPort, _node1HaPort); + assertNode(NODE1, _node1HaPort, _node1HaPort, NODE1); + createHANode(NODE2, _node2HaPort, _node1HaPort); + assertNode(NODE2, _node2HaPort, _node1HaPort, NODE1); + createHANode(NODE3, _node3HaPort, _node1HaPort); + assertNode(NODE3, _node3HaPort, _node1HaPort, NODE1); + assertRemoteNodes(NODE1, NODE2, NODE3); + } + + public void testMutateStateOfOneNode() throws Exception + { + createHANode(NODE1, _node1HaPort, _node1HaPort); + createHANode(NODE2, _node2HaPort, _node1HaPort); + createHANode(NODE3, _node3HaPort, _node1HaPort); + + String node1Url = _baseNodeRestUrl + NODE1; + String node2Url = _baseNodeRestUrl + NODE2; + String node3Url = _baseNodeRestUrl + NODE3; + + assertActualAndDesiredStates(node1Url, "ACTIVE", "ACTIVE"); + assertActualAndDesiredStates(node2Url, "ACTIVE", "ACTIVE"); + assertActualAndDesiredStates(node3Url, "ACTIVE", "ACTIVE"); + + mutateDesiredState(node1Url, "STOPPED"); + + assertActualAndDesiredStates(node1Url, "STOPPED", "STOPPED"); + assertActualAndDesiredStates(node2Url, "ACTIVE", "ACTIVE"); + assertActualAndDesiredStates(node3Url, "ACTIVE", "ACTIVE"); + + List<Map<String, Object>> remoteNodes = getRestTestHelper().getJsonAsList("replicationnode/" + NODE2); + assertEquals("Unexpected number of remote nodes on " + NODE2, 2, remoteNodes.size()); + + Map<String, Object> remoteNode1 = findRemoteNodeByName(remoteNodes, NODE1); + + assertEquals("Node 1 observed from node 2 is in the wrong state", + "UNAVAILABLE", remoteNode1.get(BDBHARemoteReplicationNode.STATE)); + assertEquals("Node 1 observed from node 2 has the wrong role", + "UNKNOWN", remoteNode1.get(BDBHARemoteReplicationNode.ROLE)); + + } + + public void testNewMasterElectedWhenVirtualHostIsStopped() throws Exception + { + createHANode(NODE1, _node1HaPort, _node1HaPort); + createHANode(NODE2, _node2HaPort, _node1HaPort); + createHANode(NODE3, _node3HaPort, _node1HaPort); + + String node1Url = _baseNodeRestUrl + NODE1; + String node2Url = _baseNodeRestUrl + NODE2; + String node3Url = _baseNodeRestUrl + NODE3; + + assertActualAndDesiredStates(node1Url, "ACTIVE", "ACTIVE"); + assertActualAndDesiredStates(node2Url, "ACTIVE", "ACTIVE"); + assertActualAndDesiredStates(node3Url, "ACTIVE", "ACTIVE"); + + // Put virtualhost in STOPPED state + String virtualHostRestUrl = "virtualhost/" + NODE1 + "/" + _hostName; + assertActualAndDesiredStates(virtualHostRestUrl, "ACTIVE", "ACTIVE"); + mutateDesiredState(virtualHostRestUrl, "STOPPED"); + assertActualAndDesiredStates(virtualHostRestUrl, "STOPPED", "STOPPED"); + + // Now stop node 1 to cause an election between nodes 2 & 3 + mutateDesiredState(node1Url, "STOPPED"); + assertActualAndDesiredStates(node1Url, "STOPPED", "STOPPED"); + + Map<String, Object> newMasterData = awaitNewMaster(node2Url, node3Url); + + //Check the virtual host of the new master is in the stopped state + String newMasterVirtualHostRestUrl = "virtualhost/" + newMasterData.get(BDBHAVirtualHostNode.NAME) + "/" + _hostName; + assertActualAndDesiredStates(newMasterVirtualHostRestUrl, "STOPPED", "STOPPED"); + } + + public void testDeleteReplicaNode() throws Exception + { + createHANode(NODE1, _node1HaPort, _node1HaPort); + createHANode(NODE2, _node2HaPort, _node1HaPort); + createHANode(NODE3, _node3HaPort, _node1HaPort); + + assertRemoteNodes(NODE1, NODE2, NODE3); + + List<Map<String,Object>> data = getRestTestHelper().getJsonAsList("replicationnode/" + NODE1); + assertEquals("Unexpected number of remote nodes on " + NODE1, 2, data.size()); + + int responseCode = getRestTestHelper().submitRequest(_baseNodeRestUrl + NODE2, "DELETE"); + assertEquals("Unexpected response code on deletion of virtual host node " + NODE2, 200, responseCode); + + int counter = 0; + while (data.size() != 1 && counter<50) + { + data = getRestTestHelper().getJsonAsList("replicationnode/" + NODE1); + if (data.size() != 1) + { + Thread.sleep(100l); + } + counter++; + } + assertEquals("Unexpected number of remote nodes on " + NODE1, 1, data.size()); + } + + public void testDeleteMasterNode() throws Exception + { + createHANode(NODE1, _node1HaPort, _node1HaPort); + createHANode(NODE2, _node2HaPort, _node1HaPort); + createHANode(NODE3, _node3HaPort, _node1HaPort); + + assertNode(NODE1, _node1HaPort, _node1HaPort, NODE1); + assertRemoteNodes(NODE1, NODE2, NODE3); + + // change priority to make Node2 a master + int responseCode = getRestTestHelper().submitRequest(_baseNodeRestUrl + NODE2, "PUT", Collections.<String,Object>singletonMap(BDBHAVirtualHostNode.PRIORITY, 100)); + assertEquals("Unexpected response code on priority update of virtual host node " + NODE2, 200, responseCode); + + List<Map<String,Object>> data = getRestTestHelper().getJsonAsList("replicationnode/" + NODE2); + assertEquals("Unexpected number of remote nodes on " + NODE2, 2, data.size()); + + // delete master + responseCode = getRestTestHelper().submitRequest(_baseNodeRestUrl + NODE1, "DELETE"); + assertEquals("Unexpected response code on deletion of virtual host node " + NODE1, 200, responseCode); + + // wait for new master + waitForAttributeChanged(_baseNodeRestUrl + NODE2 + "?depth=0", BDBHAVirtualHostNode.ROLE, "MASTER"); + + // delete remote node + responseCode = getRestTestHelper().submitRequest("replicationnode/" + NODE2 + "/" + NODE1, "DELETE"); + assertEquals("Unexpected response code on deletion of remote node " + NODE1, 200, responseCode); + + int counter = 0; + while (data.size() != 1 && counter<50) + { + data = getRestTestHelper().getJsonAsList("replicationnode/" + NODE2); + if (data.size() != 1) + { + Thread.sleep(100l); + } + counter++; + } + assertEquals("Unexpected number of remote nodes on " + NODE2, 1, data.size()); + } + + public void testIntruderBDBHAVHNNotAllowedNoConnect() throws Exception + { + createHANode(NODE1, _node1HaPort, _node1HaPort); + assertNode(NODE1, _node1HaPort, _node1HaPort, NODE1); + + // add permitted node + Map<String, Object> node3Data = createNodeAttributeMap(NODE3, _node3HaPort, _node1HaPort); + getRestTestHelper().submitRequest(_baseNodeRestUrl + NODE3, "PUT", node3Data, 201); + assertNode(NODE3, _node3HaPort, _node1HaPort, NODE1); + assertRemoteNodes(NODE1, NODE3); + + int intruderPort = getNextAvailable(_node3HaPort + 1); + + // try to add not permitted node + Map<String, Object> nodeData = createNodeAttributeMap(NODE2, intruderPort, _node1HaPort); + getRestTestHelper().submitRequest(_baseNodeRestUrl + NODE2, "PUT", nodeData, 409); + + assertRemoteNodes(NODE1, NODE3); + } + + public void testIntruderProtection() throws Exception + { + createHANode(NODE1, _node1HaPort, _node1HaPort); + assertNode(NODE1, _node1HaPort, _node1HaPort, NODE1); + + Map<String,Object> nodeData = getRestTestHelper().getJsonAsSingletonList(_baseNodeRestUrl + NODE1); + String node1StorePath = (String)nodeData.get(BDBHAVirtualHostNode.STORE_PATH); + long transactionId = ((Number)nodeData.get(BDBHAVirtualHostNode.LAST_KNOWN_REPLICATION_TRANSACTION_ID)).longValue(); + + // add permitted node + Map<String, Object> node3Data = createNodeAttributeMap(NODE3, _node3HaPort, _node1HaPort); + getRestTestHelper().submitRequest(_baseNodeRestUrl + NODE3, "PUT", node3Data, 201); + assertNode(NODE3, _node3HaPort, _node1HaPort, NODE1); + assertRemoteNodes(NODE1, NODE3); + + // Ensure PINGDB is created + // in order to exclude hanging of environment + // when environment.close is called whilst PINGDB is created. + // On node joining, a record is updated in PINGDB + // if lastTransactionId is incremented then node ping task was executed + int counter = 0; + long newTransactionId = transactionId; + while(newTransactionId == transactionId && counter<50) + { + nodeData = getRestTestHelper().getJsonAsSingletonList(_baseNodeRestUrl + NODE1); + newTransactionId = ((Number)nodeData.get(BDBHAVirtualHostNode.LAST_KNOWN_REPLICATION_TRANSACTION_ID)).longValue(); + if (newTransactionId != transactionId) + { + break; + } + counter++; + Thread.sleep(100l); + } + + //connect intruder node + String nodeName = NODE2; + String nodeHostPort = "localhost:" + getNextAvailable(_node3HaPort + 1); + File environmentPathFile = new File(node1StorePath, nodeName); + environmentPathFile.mkdirs(); + ReplicationConfig replicationConfig = new ReplicationConfig((String)nodeData.get(BDBHAVirtualHostNode.GROUP_NAME), nodeName, nodeHostPort); + replicationConfig.setHelperHosts((String)nodeData.get(BDBHAVirtualHostNode.ADDRESS)); + EnvironmentConfig envConfig = new EnvironmentConfig(); + envConfig.setAllowCreate(true); + envConfig.setTransactional(true); + envConfig.setDurability(Durability.parse((String)nodeData.get(BDBHAVirtualHostNode.DURABILITY))); + + ReplicatedEnvironment intruder = null; + try + { + intruder = new ReplicatedEnvironment(environmentPathFile, replicationConfig, envConfig); + } + finally + { + if (intruder != null) + { + intruder.close(); + } + } + waitForAttributeChanged(_baseNodeRestUrl + NODE1, VirtualHostNode.STATE, State.ERRORED.name()); + waitForAttributeChanged(_baseNodeRestUrl + NODE3, VirtualHostNode.STATE, State.ERRORED.name()); + } + + private void createHANode(String nodeName, int nodePort, int helperPort) throws Exception + { + Map<String, Object> nodeData = createNodeAttributeMap(nodeName, nodePort, helperPort); + + int responseCode = getRestTestHelper().submitRequest(_baseNodeRestUrl + nodeName, "PUT", nodeData); + assertEquals("Unexpected response code for virtual host node " + nodeName + " creation request", 201, responseCode); + String hostExpectedState = nodePort == helperPort ? State.ACTIVE.name(): State.UNAVAILABLE.name(); + waitForAttributeChanged("virtualhost/" + nodeName + "/" + _hostName, BDBHAVirtualHost.STATE, hostExpectedState); + } + + private Map<String, Object> createNodeAttributeMap(String nodeName, int nodePort, int helperPort) throws Exception + { + Map<String, Object> nodeData = new HashMap<String, Object>(); + nodeData.put(BDBHAVirtualHostNode.NAME, nodeName); + nodeData.put(BDBHAVirtualHostNode.TYPE, "BDB_HA"); + nodeData.put(BDBHAVirtualHostNode.GROUP_NAME, _hostName); + nodeData.put(BDBHAVirtualHostNode.ADDRESS, "localhost:" + nodePort); + nodeData.put(BDBHAVirtualHostNode.HELPER_ADDRESS, "localhost:" + helperPort); + nodeData.put(BDBHAVirtualHostNode.HELPER_NODE_NAME, NODE1); + Map<String,String> context = new HashMap<>(); + nodeData.put(BDBHAVirtualHostNode.CONTEXT, context); + String bluePrint = GroupCreator.getBlueprint("localhost", _node1HaPort, _node2HaPort, _node3HaPort); + context.put(AbstractVirtualHostNode.VIRTUALHOST_BLUEPRINT_CONTEXT_VAR, bluePrint); + return nodeData; + } + + private void assertNode(String nodeName, int nodePort, int nodeHelperPort, String masterNode) throws Exception + { + boolean isMaster = nodeName.equals(masterNode); + String expectedRole = isMaster? "MASTER" : "REPLICA"; + waitForAttributeChanged(_baseNodeRestUrl + nodeName + "?depth=0", BDBHAVirtualHostNode.ROLE, expectedRole); + + Map<String, Object> nodeData = getRestTestHelper().getJsonAsSingletonList(_baseNodeRestUrl + nodeName + "?depth=0"); + assertEquals("Unexpected name", nodeName, nodeData.get(BDBHAVirtualHostNode.NAME)); + assertEquals("Unexpected type", "BDB_HA", nodeData.get(BDBHAVirtualHostNode.TYPE)); + assertEquals("Unexpected address", "localhost:" + nodePort, nodeData.get(BDBHAVirtualHostNode.ADDRESS)); + assertEquals("Unexpected helper address", "localhost:" + nodeHelperPort, nodeData.get(BDBHAVirtualHostNode.HELPER_ADDRESS)); + assertEquals("Unexpected group name", _hostName, nodeData.get(BDBHAVirtualHostNode.GROUP_NAME)); + assertEquals("Unexpected role", expectedRole, nodeData.get(BDBHAVirtualHostNode.ROLE)); + + Integer lastKnownTransactionId = (Integer) nodeData.get(BDBHAVirtualHostNode.LAST_KNOWN_REPLICATION_TRANSACTION_ID); + assertNotNull("Unexpected lastKnownReplicationId", lastKnownTransactionId); + assertTrue("Unexpected lastKnownReplicationId " + lastKnownTransactionId, lastKnownTransactionId > 0); + + Long joinTime = (Long) nodeData.get(BDBHAVirtualHostNode.JOIN_TIME); + assertNotNull("Unexpected joinTime", joinTime); + assertTrue("Unexpected joinTime " + joinTime, joinTime > 0); + + if (isMaster) + { + waitForAttributeChanged("virtualhost/" + masterNode + "/" + _hostName + "?depth=0", VirtualHost.STATE, State.ACTIVE.name()); + } + + } + + private void assertRemoteNodes(String masterNode, String... replicaNodes) throws Exception + { + List<String> clusterNodes = new ArrayList<String>(Arrays.asList(replicaNodes)); + clusterNodes.add(masterNode); + + for (String clusterNodeName : clusterNodes) + { + List<String> remotes = new ArrayList<String>(clusterNodes); + remotes.remove(clusterNodeName); + for (String remote : remotes) + { + String remoteUrl = "replicationnode/" + clusterNodeName + "/" + remote; + Map<String, Object> nodeData = waitForAttributeChanged(remoteUrl, BDBHARemoteReplicationNode.ROLE, remote.equals(masterNode) ? "MASTER" : "REPLICA"); + assertRemoteNodeData(remote, nodeData); + } + } + } + + private void assertRemoteNodeData(String name, Map<String, Object> nodeData) + { + assertEquals("Remote node " + name + " has unexpected name", name, nodeData.get(BDBHAVirtualHostNode.NAME)); + + Integer lastKnownTransactionId = (Integer) nodeData.get(BDBHAVirtualHostNode.LAST_KNOWN_REPLICATION_TRANSACTION_ID); + assertNotNull("Node " + name + " has unexpected lastKnownReplicationId", lastKnownTransactionId); + assertTrue("Node " + name + " has unexpected lastKnownReplicationId " + lastKnownTransactionId, lastKnownTransactionId > 0); + + Long joinTime = (Long) nodeData.get(BDBHAVirtualHostNode.JOIN_TIME); + assertNotNull("Node " + name + " has unexpected joinTime", joinTime); + assertTrue("Node " + name + " has unexpected joinTime " + joinTime, joinTime > 0); + } + + private void assertActualAndDesiredStates(final String restUrl, + final String expectedDesiredState, + final String expectedActualState) throws IOException + { + Map<String, Object> objectData = getRestTestHelper().getJsonAsSingletonList(restUrl); + Asserts.assertActualAndDesiredState(expectedDesiredState, expectedActualState, objectData); + } + + private void mutateDesiredState(final String restUrl, final String newState) throws IOException + { + Map<String, Object> newAttributes = new HashMap<String, Object>(); + newAttributes.put(VirtualHostNode.DESIRED_STATE, newState); + + getRestTestHelper().submitRequest(restUrl, "PUT", newAttributes, HttpServletResponse.SC_OK); + } + + private Map<String, Object> findRemoteNodeByName(final List<Map<String, Object>> remoteNodes, final String nodeName) + { + Map<String, Object> foundNode = null; + for (Map<String, Object> remoteNode : remoteNodes) + { + if (nodeName.equals(remoteNode.get(RemoteReplicationNode.NAME))) + { + foundNode = remoteNode; + break; + } + } + assertNotNull("Could not find node with name " + nodeName + " amongst remote nodes."); + return foundNode; + } + + private Map<String, Object> awaitNewMaster(final String... nodeUrls) + throws IOException, InterruptedException + { + Map<String, Object> newMasterData = null; + int counter = 0; + while (newMasterData == null && counter < 50) + { + for(String nodeUrl: nodeUrls) + { + Map<String, Object> nodeData = getRestTestHelper().getJsonAsSingletonList(nodeUrl); + if ("MASTER".equals(nodeData.get(BDBHAVirtualHostNode.ROLE))) + { + newMasterData = nodeData; + break; + } + } + if (newMasterData == null) + { + Thread.sleep(100l); + counter++; + } + } + assertNotNull("Could not find new master", newMasterData); + return newMasterData; + } + + +} diff --git a/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/BDBHAVirtualHostRestTest.java b/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/BDBHAVirtualHostRestTest.java new file mode 100644 index 0000000000..07ce033a55 --- /dev/null +++ b/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/BDBHAVirtualHostRestTest.java @@ -0,0 +1,156 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb.replication; + +import static org.apache.qpid.server.virtualhost.berkeleydb.BDBHAVirtualHost.LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY; +import static org.apache.qpid.server.virtualhost.berkeleydb.BDBHAVirtualHost.REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY; + +import java.io.File; +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import javax.servlet.http.HttpServletResponse; + +import org.apache.qpid.server.model.State; +import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.model.VirtualHostNode; +import org.apache.qpid.server.virtualhostnode.AbstractVirtualHostNode; +import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHAVirtualHostNode; +import org.apache.qpid.systest.rest.Asserts; +import org.apache.qpid.systest.rest.QpidRestTestCase; +import org.apache.qpid.test.utils.TestBrokerConfiguration; +import org.apache.qpid.util.FileUtils; + +public class BDBHAVirtualHostRestTest extends QpidRestTestCase +{ + private String _hostName; + private File _storeBaseDir; + private int _nodeHaPort; + private Object _nodeName; + private String _virtualhostUrl; + private String _bluePrint; + + @Override + public void setUp() throws Exception + { + setTestSystemProperty(ReplicatedEnvironmentFacade.REMOTE_NODE_MONITOR_INTERVAL_PROPERTY_NAME, "1000"); + _hostName = "ha"; + _nodeName = "node1"; + _storeBaseDir = new File(TMP_FOLDER, "store-" + _hostName + "-" + System.currentTimeMillis()); + _nodeHaPort = getNextAvailable(getRestTestHelper().getHttpPort() + 1); + _virtualhostUrl = "virtualhost/" + _nodeName + "/" + _hostName; + _bluePrint = GroupCreator.getBlueprint("localhost", _nodeHaPort); + + super.setUp(); + } + + @Override + public void tearDown() throws Exception + { + try + { + super.tearDown(); + } + finally + { + if (_storeBaseDir != null) + { + FileUtils.delete(_storeBaseDir, true); + } + } + } + + @Override + protected void customizeConfiguration() throws IOException + { + super.customizeConfiguration(); + TestBrokerConfiguration config = getBrokerConfiguration(); + config.removeObjectConfiguration(VirtualHostNode.class, TEST2_VIRTUALHOST); + config.removeObjectConfiguration(VirtualHostNode.class, TEST3_VIRTUALHOST); + + Map<String, Object> nodeAttributes = new HashMap<String, Object>(); + nodeAttributes.put(BDBHAVirtualHostNode.NAME, _nodeName); + nodeAttributes.put(BDBHAVirtualHostNode.TYPE, "BDB_HA"); + nodeAttributes.put(BDBHAVirtualHostNode.STORE_PATH, _storeBaseDir.getPath() + File.separator + _nodeName); + nodeAttributes.put(BDBHAVirtualHostNode.GROUP_NAME, _hostName); + nodeAttributes.put(BDBHAVirtualHostNode.ADDRESS, "localhost:" + _nodeHaPort); + nodeAttributes.put(BDBHAVirtualHostNode.HELPER_ADDRESS, "localhost:" + _nodeHaPort); + nodeAttributes.put(BDBHAVirtualHostNode.HELPER_NODE_NAME, _nodeName); + Map<String, String> context = new HashMap<String,String>(); + context.put(AbstractVirtualHostNode.VIRTUALHOST_BLUEPRINT_CONTEXT_VAR, _bluePrint); + + nodeAttributes.put(BDBHAVirtualHostNode.CONTEXT, context); + config.addObjectConfiguration(VirtualHostNode.class, nodeAttributes); + } + + public void testSetLocalTransactionSynchronizationPolicy() throws Exception + { + Map<String, Object> hostAttributes = waitForAttributeChanged(_virtualhostUrl, VirtualHost.STATE, State.ACTIVE.name()); + assertEquals("Unexpected synchronization policy before change", "SYNC", hostAttributes.get(LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY)); + + Map<String, Object> newPolicy = Collections.<String, Object>singletonMap(LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY, "NO_SYNC"); + getRestTestHelper().submitRequest(_virtualhostUrl, "PUT", newPolicy, HttpServletResponse.SC_OK); + + hostAttributes = getRestTestHelper().getJsonAsSingletonList(_virtualhostUrl); + assertEquals("Unexpected synchronization policy after change", "NO_SYNC", hostAttributes.get(LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY)); + } + + public void testSetRemoteTransactionSynchronizationPolicy() throws Exception + { + Map<String, Object> hostAttributes = waitForAttributeChanged(_virtualhostUrl, VirtualHost.STATE, State.ACTIVE.name()); + assertEquals("Unexpected synchronization policy before change", "NO_SYNC", hostAttributes.get(REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY)); + + Map<String, Object> newPolicy = Collections.<String, Object>singletonMap(REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY, "SYNC"); + getRestTestHelper().submitRequest(_virtualhostUrl, "PUT", newPolicy, HttpServletResponse.SC_OK); + + hostAttributes = getRestTestHelper().getJsonAsSingletonList(_virtualhostUrl); + assertEquals("Unexpected synchronization policy after change", "SYNC", hostAttributes.get(REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY)); + } + + public void testMutateState() throws Exception + { + waitForAttributeChanged(_virtualhostUrl, VirtualHost.STATE, "ACTIVE"); + assertActualAndDesireStates(_virtualhostUrl, "ACTIVE", "ACTIVE"); + + Map<String, Object> newAttributes = Collections.<String, Object>singletonMap(VirtualHost.DESIRED_STATE, "STOPPED"); + getRestTestHelper().submitRequest(_virtualhostUrl, "PUT", newAttributes, HttpServletResponse.SC_OK); + + waitForAttributeChanged(_virtualhostUrl, VirtualHost.STATE, "STOPPED"); + assertActualAndDesireStates(_virtualhostUrl, "STOPPED", "STOPPED"); + + newAttributes = Collections.<String, Object>singletonMap(VirtualHost.DESIRED_STATE, "ACTIVE"); + getRestTestHelper().submitRequest(_virtualhostUrl, "PUT", newAttributes, HttpServletResponse.SC_OK); + + waitForAttributeChanged(_virtualhostUrl, VirtualHost.STATE, "ACTIVE"); + assertActualAndDesireStates(_virtualhostUrl, "ACTIVE", "ACTIVE"); + } + + private void assertActualAndDesireStates(final String restUrl, + final String expectedDesiredState, + final String expectedActualState) throws IOException + { + Map<String, Object> virtualhost = getRestTestHelper().getJsonAsSingletonList(restUrl); + Asserts.assertActualAndDesiredState(expectedDesiredState, expectedActualState, virtualhost); + } + +} diff --git a/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/GroupCreator.java b/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/GroupCreator.java new file mode 100644 index 0000000000..e78ef34759 --- /dev/null +++ b/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/GroupCreator.java @@ -0,0 +1,535 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb.replication; + +import java.io.File; +import java.io.IOException; +import java.io.StringWriter; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.Callable; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import javax.jms.Connection; + +import org.apache.commons.lang.StringUtils; +import org.apache.log4j.Logger; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQConnectionURL; +import org.apache.qpid.jms.ConnectionURL; +import org.apache.qpid.server.management.plugin.HttpManagement; +import org.apache.qpid.server.model.Plugin; +import org.apache.qpid.server.model.Port; +import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.model.VirtualHostNode; +import org.apache.qpid.server.virtualhost.berkeleydb.BDBHAVirtualHost; +import org.apache.qpid.server.virtualhost.berkeleydb.BDBHAVirtualHostImpl; +import org.apache.qpid.server.virtualhostnode.AbstractVirtualHostNode; +import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHARemoteReplicationNode; +import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHAVirtualHostNode; +import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHAVirtualHostNodeImpl; +import org.apache.qpid.systest.rest.RestTestHelper; +import org.apache.qpid.test.utils.QpidBrokerTestCase; +import org.apache.qpid.test.utils.TestBrokerConfiguration; +import org.apache.qpid.url.URLSyntaxException; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.map.SerializationConfig; +import org.junit.Assert; + +import com.sleepycat.je.rep.ReplicationConfig; + +public class GroupCreator +{ + protected static final Logger LOGGER = Logger.getLogger(GroupCreator.class); + + private static final String MANY_BROKER_URL_FORMAT = "amqp://guest:guest@/%s?brokerlist='%s'&failover='roundrobin?cyclecount='%d''"; + private static final String BROKER_PORTION_FORMAT = "tcp://localhost:%d?connectdelay='%d',retries='%d'"; + + private static final int FAILOVER_CYCLECOUNT = 10; + private static final int FAILOVER_RETRIES = 1; + private static final int FAILOVER_CONNECTDELAY = 1000; + + private static final String SINGLE_BROKER_URL_WITH_RETRY_FORMAT = "amqp://guest:guest@/%s?brokerlist='tcp://localhost:%d?connectdelay='%d',retries='%d''"; + private static final String SINGLE_BROKER_URL_WITHOUT_RETRY_FORMAT = "amqp://guest:guest@/%s?brokerlist='tcp://localhost:%d'"; + + private static final int RETRIES = 60; + private static final int CONNECTDELAY = 75; + + private final QpidBrokerTestCase _testcase; + private final Map<Integer, Integer> _brokerPortToBdbPortMap = new TreeMap<Integer, Integer>(); + private final String _virtualHostName; + + private final String _ipAddressOfBroker; + private final String _groupName ; + private final int _numberOfNodes; + private int _bdbHelperPort; + private int _primaryBrokerPort; + + public GroupCreator(QpidBrokerTestCase testcase, String virtualHostName, int numberOfNodes) + { + _testcase = testcase; + _virtualHostName = virtualHostName; + _groupName = virtualHostName; + _ipAddressOfBroker = getIpAddressOfBrokerHost(); + _numberOfNodes = numberOfNodes; + _bdbHelperPort = 0; + } + + public void configureClusterNodes() throws Exception + { + int brokerPort = _testcase.findFreePort(); + + int[] bdbPorts = new int[_numberOfNodes]; + for (int i = 0; i < _numberOfNodes; i++) + { + int bdbPort = _testcase.getNextAvailable(brokerPort + 1); + bdbPorts[i] = bdbPort; + _brokerPortToBdbPortMap.put(brokerPort, bdbPort); + brokerPort = _testcase.getNextAvailable(bdbPort + 1); + } + + String bluePrintJson = getBlueprint(_ipAddressOfBroker, bdbPorts); + + String helperName = null; + for (Map.Entry<Integer,Integer> entry: _brokerPortToBdbPortMap.entrySet()) + { + brokerPort = entry.getKey(); + int bdbPort = entry.getValue(); + LOGGER.debug("Cluster broker port " + brokerPort + ", bdb replication port " + bdbPort); + if (_bdbHelperPort == 0) + { + _bdbHelperPort = bdbPort; + } + + String nodeName = getNodeNameForNodeAt(bdbPort); + if (helperName == null) + { + helperName = nodeName; + } + + Map<String, Object> virtualHostNodeAttributes = new HashMap<String, Object>(); + virtualHostNodeAttributes.put(BDBHAVirtualHostNode.STORE_PATH, System.getProperty("QPID_WORK") + File.separator + brokerPort); + virtualHostNodeAttributes.put(BDBHAVirtualHostNode.GROUP_NAME, _groupName); + virtualHostNodeAttributes.put(BDBHAVirtualHostNode.NAME, nodeName); + virtualHostNodeAttributes.put(BDBHAVirtualHostNode.ADDRESS, getNodeHostPortForNodeAt(bdbPort)); + virtualHostNodeAttributes.put(BDBHAVirtualHostNode.HELPER_ADDRESS, getHelperHostPort()); + virtualHostNodeAttributes.put(BDBHAVirtualHostNode.TYPE, BDBHAVirtualHostNodeImpl.VIRTUAL_HOST_NODE_TYPE); + virtualHostNodeAttributes.put(BDBHAVirtualHostNode.HELPER_NODE_NAME, helperName); + + Map<String, String> context = new HashMap<>(); + context.put(ReplicationConfig.INSUFFICIENT_REPLICAS_TIMEOUT, "2 s"); + context.put(ReplicationConfig.ELECTIONS_PRIMARY_RETRIES, "0"); + context.put(AbstractVirtualHostNode.VIRTUALHOST_BLUEPRINT_CONTEXT_VAR, bluePrintJson); + virtualHostNodeAttributes.put(BDBHAVirtualHostNode.CONTEXT, context); + + TestBrokerConfiguration brokerConfiguration = _testcase.getBrokerConfiguration(brokerPort); + brokerConfiguration.addJmxManagementConfiguration(); + brokerConfiguration.addHttpManagementConfiguration(); + brokerConfiguration.setObjectAttribute(Plugin.class, TestBrokerConfiguration.ENTRY_NAME_HTTP_MANAGEMENT, HttpManagement.HTTP_BASIC_AUTHENTICATION_ENABLED, true); + brokerConfiguration.setObjectAttribute(Port.class, TestBrokerConfiguration.ENTRY_NAME_HTTP_PORT, Port.PORT, _testcase.getHttpManagementPort(brokerPort)); + brokerConfiguration.setObjectAttributes(VirtualHostNode.class, _virtualHostName, virtualHostNodeAttributes); + + } + _primaryBrokerPort = getPrimaryBrokerPort(); + } + + public void setDesignatedPrimaryOnFirstBroker(boolean designatedPrimary) throws Exception + { + if (_numberOfNodes != 2) + { + throw new IllegalArgumentException("Only two nodes groups have the concept of primary"); + } + TestBrokerConfiguration config = _testcase.getBrokerConfiguration(_primaryBrokerPort); + String nodeName = getNodeNameForNodeAt(_brokerPortToBdbPortMap.get(_primaryBrokerPort)); + config.setObjectAttribute(VirtualHostNode.class, nodeName, BDBHAVirtualHostNode.DESIGNATED_PRIMARY, designatedPrimary); + config.setSaved(false); + } + + private int getPrimaryBrokerPort() + { + return _brokerPortToBdbPortMap.keySet().iterator().next(); + } + + public void startNode(final int brokerPortNumber) throws Exception + { + _testcase.startBroker(brokerPortNumber); + } + + public void startCluster() throws Exception + { + for (final Integer brokerPortNumber : _brokerPortToBdbPortMap.keySet()) + { + startNode(brokerPortNumber); + } + } + + public void startClusterParallel() throws Exception + { + final ExecutorService executor = Executors.newFixedThreadPool(_brokerPortToBdbPortMap.size()); + try + { + List<Future<Object>> brokers = new CopyOnWriteArrayList<Future<Object>>(); + for (final Integer brokerPortNumber : _brokerPortToBdbPortMap.keySet()) + { + final TestBrokerConfiguration brokerConfig = _testcase.getBrokerConfiguration(brokerPortNumber); + Future<Object> future = executor.submit(new Callable<Object>() + { + public Object call() + { + try + { + _testcase.startBroker(brokerPortNumber, brokerConfig); + return "OK"; + } + catch (Exception e) + { + return e; + } + } + }); + brokers.add(future); + } + for (Future<Object> future : brokers) + { + Object result = future.get(30, TimeUnit.SECONDS); + LOGGER.debug("Node startup result:" + result); + if (result instanceof Exception) + { + throw (Exception) result; + } + else if (!"OK".equals(result)) + { + throw new Exception("One of the cluster nodes is not started"); + } + } + } + catch (Exception e) + { + stopCluster(); + throw e; + } + finally + { + executor.shutdown(); + } + + } + + public void stopNode(final int brokerPortNumber) + { + _testcase.killBroker(brokerPortNumber); + } + + public void stopCluster() throws Exception + { + for (final Integer brokerPortNumber : _brokerPortToBdbPortMap.keySet()) + { + try + { + stopNode(brokerPortNumber); + } + catch(Exception e) + { + LOGGER.warn("Failed to stop node on port:" + brokerPortNumber); + } + } + } + + public int getBrokerPortNumberFromConnection(Connection connection) + { + final AMQConnection amqConnection = (AMQConnection)connection; + return amqConnection.getActiveBrokerDetails().getPort(); + } + + public int getPortNumberOfAnInactiveBroker(final Connection activeConnection) + { + final Set<Integer> allBrokerPorts = _testcase.getBrokerPortNumbers(); + LOGGER.debug("Broker ports:" + allBrokerPorts); + final int activeBrokerPort = getBrokerPortNumberFromConnection(activeConnection); + allBrokerPorts.remove(activeBrokerPort); + LOGGER.debug("Broker ports:" + allBrokerPorts); + final int inactiveBrokerPort = allBrokerPorts.iterator().next(); + return inactiveBrokerPort; + } + + public int getBdbPortForBrokerPort(final int brokerPortNumber) + { + return _brokerPortToBdbPortMap.get(brokerPortNumber); + } + + public Set<Integer> getBdbPortNumbers() + { + return new HashSet<Integer>(_brokerPortToBdbPortMap.values()); + } + + public ConnectionURL getConnectionUrlForAllClusterNodes() throws Exception + { + return getConnectionUrlForAllClusterNodes(FAILOVER_CONNECTDELAY, FAILOVER_RETRIES, FAILOVER_CYCLECOUNT); + } + + public ConnectionURL getConnectionUrlForAllClusterNodes(int connectDelay, int retries, final int cyclecount) throws Exception + { + final StringBuilder brokerList = new StringBuilder(); + + for(Iterator<Integer> itr = _brokerPortToBdbPortMap.keySet().iterator(); itr.hasNext(); ) + { + int brokerPortNumber = itr.next(); + + brokerList.append(String.format(BROKER_PORTION_FORMAT, brokerPortNumber, connectDelay, retries)); + if (itr.hasNext()) + { + brokerList.append(";"); + } + } + + return new AMQConnectionURL(String.format(MANY_BROKER_URL_FORMAT, _virtualHostName, brokerList, cyclecount)); + } + + public AMQConnectionURL getConnectionUrlForSingleNodeWithoutRetry(final int brokerPortNumber) throws URLSyntaxException + { + return getConnectionUrlForSingleNode(brokerPortNumber, false); + } + + public AMQConnectionURL getConnectionUrlForSingleNodeWithRetry(final int brokerPortNumber) throws URLSyntaxException + { + return getConnectionUrlForSingleNode(brokerPortNumber, true); + } + + private AMQConnectionURL getConnectionUrlForSingleNode(final int brokerPortNumber, boolean retryAllowed) throws URLSyntaxException + { + final String url; + if (retryAllowed) + { + url = String.format(SINGLE_BROKER_URL_WITH_RETRY_FORMAT, _virtualHostName, brokerPortNumber, CONNECTDELAY, RETRIES); + } + else + { + url = String.format(SINGLE_BROKER_URL_WITHOUT_RETRY_FORMAT, _virtualHostName, brokerPortNumber); + } + + return new AMQConnectionURL(url); + } + + public String getGroupName() + { + return _groupName; + } + + public String getNodeNameForNodeAt(final int bdbPort) + { + return "node" + _testcase.getName() + bdbPort; + } + + public String getNodeHostPortForNodeAt(final int bdbPort) + { + return _ipAddressOfBroker + ":" + bdbPort; + } + + public String getHelperHostPort() + { + if (_bdbHelperPort == 0) + { + throw new IllegalStateException("Helper port not yet assigned."); + } + + return _ipAddressOfBroker + ":" + _bdbHelperPort; + } + + public void setHelperHostPort(int bdbHelperPort) + { + _bdbHelperPort = bdbHelperPort; + } + + public int getBrokerPortNumberOfPrimary() + { + if (_numberOfNodes != 2) + { + throw new IllegalArgumentException("Only two nodes groups have the concept of primary"); + } + + return _primaryBrokerPort; + } + + public int getBrokerPortNumberOfSecondaryNode() + { + final Set<Integer> portNumbers = getBrokerPortNumbersForNodes(); + portNumbers.remove(getBrokerPortNumberOfPrimary()); + return portNumbers.iterator().next(); + } + + public Set<Integer> getBrokerPortNumbersForNodes() + { + return new HashSet<Integer>(_brokerPortToBdbPortMap.keySet()); + } + + + public String getIpAddressOfBrokerHost() + { + String brokerHost = _testcase.getBroker().getHost(); + try + { + return InetAddress.getByName(brokerHost).getHostAddress(); + } + catch (UnknownHostException e) + { + throw new RuntimeException("Could not determine IP address of host : " + brokerHost, e); + } + } + + public void modifyClusterNodeBdbAddress(int brokerPortNumberToBeMoved, int newBdbPort) + { + TestBrokerConfiguration config = _testcase.getBrokerConfiguration(brokerPortNumberToBeMoved); + String nodeName = getNodeNameForNodeAt(_brokerPortToBdbPortMap.get(brokerPortNumberToBeMoved)); + + Map<String, Object> objectAttributes = config.getObjectAttributes(VirtualHostNode.class, nodeName); + + String oldBdbHostPort = (String)objectAttributes.get(BDBHAVirtualHostNode.ADDRESS); + String[] oldHostAndPort = StringUtils.split(oldBdbHostPort, ":"); + String oldHost = oldHostAndPort[0]; + String newBdbHostPort = oldHost + ":" + newBdbPort; + config.setObjectAttribute(VirtualHostNode.class, nodeName, BDBHAVirtualHostNode.ADDRESS, newBdbHostPort); + config.setSaved(false); + } + + public String getNodeNameForBrokerPort(final int brokerPort) + { + return getNodeNameForNodeAt(_brokerPortToBdbPortMap.get(brokerPort)); + } + + public void setNodeAttributes(int brokerPort, Map<String, Object> attributeMap) + throws Exception + { + setNodeAttributes(brokerPort, brokerPort, attributeMap); + } + + public void setNodeAttributes(int localNodePort, int remoteNodePort, Map<String, Object> attributeMap) + throws Exception + { + RestTestHelper restHelper = createRestTestHelper(localNodePort); + String url = getNodeRestUrl(localNodePort, remoteNodePort); + int status = restHelper.submitRequest(url, "PUT", attributeMap); + if (status != 200) + { + throw new Exception("Unexpected http status when updating " + getNodeNameForBrokerPort(remoteNodePort) + " attribute(s) : " + status); + } + } + + private String getNodeRestUrl(int localNodePort, int remoteNodePort) + { + String remoteNodeName = getNodeNameForBrokerPort(remoteNodePort); + String localNodeName = getNodeNameForBrokerPort(localNodePort); + String url = null; + if (localNodePort == remoteNodePort) + { + url = "/api/latest/virtualhostnode/" + localNodeName; + } + else + { + url = "/api/latest/replicationnode/" + localNodeName + "/" + remoteNodeName; + } + return url; + } + + public Map<String, Object> getNodeAttributes(int brokerPort) throws IOException + { + return getNodeAttributes(brokerPort, brokerPort); + } + + public Map<String, Object> getNodeAttributes(int localNodePort, int remoteNodePort) throws IOException + { + RestTestHelper restHelper = createRestTestHelper(localNodePort); + List<Map<String, Object>> results= restHelper.getJsonAsList(getNodeRestUrl(localNodePort, remoteNodePort)); + int size = results.size(); + if (size == 0) + { + return Collections.emptyMap(); + } + else if (size == 1) + { + return results.get(0); + } + else + { + throw new RuntimeException("Unexpected number of nodes " + size); + } + } + + public void awaitNodeToAttainRole(int brokerPort, String desiredRole) throws Exception + { + awaitNodeToAttainRole(brokerPort, brokerPort, desiredRole); + } + + public void awaitNodeToAttainRole(int localNodePort, int remoteNodePort, String desiredRole) throws Exception + { + final long startTime = System.currentTimeMillis(); + Map<String, Object> data = Collections.emptyMap(); + + while(!desiredRole.equals(data.get(BDBHARemoteReplicationNode.ROLE)) && (System.currentTimeMillis() - startTime) < 30000) + { + LOGGER.debug("Awaiting node '" + getNodeNameForBrokerPort(remoteNodePort) + "' to transit into " + desiredRole + " role"); + data = getNodeAttributes(localNodePort, remoteNodePort); + if (!desiredRole.equals(data.get(BDBHARemoteReplicationNode.ROLE))) + { + Thread.sleep(1000); + } + } + LOGGER.debug("Node '" + getNodeNameForBrokerPort(remoteNodePort) + "' role is " + data.get(BDBHARemoteReplicationNode.ROLE)); + Assert.assertEquals("Node is in unexpected role", desiredRole, data.get(BDBHARemoteReplicationNode.ROLE)); + } + + public RestTestHelper createRestTestHelper(int brokerPort) + { + int httpPort = _testcase.getHttpManagementPort(brokerPort); + RestTestHelper helper = new RestTestHelper(httpPort); + helper.setUsernameAndPassword("webadmin", "webadmin"); + return helper; + } + + public static String getBlueprint(String hostName, int... ports) throws Exception + { + List<String> permittedNodes = new ArrayList<String>(); + for (int port:ports) + { + permittedNodes.add(hostName + ":" + port); + } + Map<String,Object> bluePrint = new HashMap<>(); + bluePrint.put(VirtualHost.TYPE, BDBHAVirtualHostImpl.VIRTUAL_HOST_TYPE); + bluePrint.put(BDBHAVirtualHost.PERMITTED_NODES, permittedNodes); + + StringWriter writer = new StringWriter(); + ObjectMapper mapper = new ObjectMapper(); + mapper.configure(SerializationConfig.Feature.INDENT_OUTPUT, true); + mapper.writeValue(writer, bluePrint); + return writer.toString(); + } +} diff --git a/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/JMXManagementTest.java b/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/JMXManagementTest.java new file mode 100644 index 0000000000..c6f005c0e7 --- /dev/null +++ b/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/JMXManagementTest.java @@ -0,0 +1,321 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb.replication; + +import static com.sleepycat.je.rep.ReplicatedEnvironment.State.DETACHED; +import static com.sleepycat.je.rep.ReplicatedEnvironment.State.MASTER; +import static com.sleepycat.je.rep.ReplicatedEnvironment.State.REPLICA; +import static com.sleepycat.je.rep.ReplicatedEnvironment.State.UNKNOWN; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import javax.jms.Connection; +import javax.management.ObjectName; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.TabularData; + +import org.apache.log4j.Logger; +import org.apache.qpid.jms.ConnectionURL; +import org.apache.qpid.management.common.mbeans.ManagedBroker; +import org.apache.qpid.server.model.State; +import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.store.berkeleydb.jmx.ManagedBDBHAMessageStore; +import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHAVirtualHostNode; +import org.apache.qpid.systest.rest.RestTestHelper; +import org.apache.qpid.test.utils.JMXTestUtils; +import org.apache.qpid.test.utils.QpidBrokerTestCase; +import org.junit.Assert; + +/** + * System test verifying the ability to control a cluster via the Management API. + * + * @see MultiNodeTest + */ +public class JMXManagementTest extends QpidBrokerTestCase +{ + protected static final Logger LOGGER = Logger.getLogger(JMXManagementTest.class); + + private static final Set<String> NON_MASTER_STATES = new HashSet<String>(Arrays.asList(REPLICA.toString(), DETACHED.toString(), UNKNOWN.toString()));; + private static final String VIRTUAL_HOST = "test"; + + private static final String MANAGED_OBJECT_QUERY = "org.apache.qpid:type=BDBHAMessageStore,name=" + ObjectName.quote(VIRTUAL_HOST); + private static final int NUMBER_OF_NODES = 4; + + private final GroupCreator _clusterCreator = new GroupCreator(this, VIRTUAL_HOST, NUMBER_OF_NODES); + private final JMXTestUtils _jmxUtils = new JMXTestUtils(this); + + private ConnectionURL _brokerFailoverUrl; + + @Override + protected void setUp() throws Exception + { + _brokerType = BrokerType.SPAWNED; + + _clusterCreator.configureClusterNodes(); + _brokerFailoverUrl = _clusterCreator.getConnectionUrlForAllClusterNodes(); + _clusterCreator.startCluster(); + + super.setUp(); + } + + @Override + protected void tearDown() throws Exception + { + try + { + _jmxUtils.close(); + } + finally + { + super.tearDown(); + } + } + + @Override + public void startBroker() throws Exception + { + // Don't start default broker provided by QBTC. + } + + public void testReadonlyMBeanAttributes() throws Exception + { + final int brokerPortNumber = getBrokerPortNumbers().iterator().next(); + final int bdbPortNumber = _clusterCreator.getBdbPortForBrokerPort(brokerPortNumber); + + ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(brokerPortNumber); + assertEquals("Unexpected store group name", _clusterCreator.getGroupName(), storeBean.getGroupName()); + assertEquals("Unexpected store node name", _clusterCreator.getNodeNameForNodeAt(bdbPortNumber), storeBean.getNodeName()); + assertEquals("Unexpected store node host port",_clusterCreator.getNodeHostPortForNodeAt(bdbPortNumber), storeBean.getNodeHostPort()); + assertEquals("Unexpected store helper host port", _clusterCreator.getHelperHostPort(), storeBean.getHelperHostPort()); + // As we have chosen an arbitrary broker from the cluster, we cannot predict its state + assertNotNull("Store state must not be null", storeBean.getNodeState()); + } + + public void testStateOfActiveBrokerIsMaster() throws Exception + { + final Connection activeConnection = getConnection(_brokerFailoverUrl); + final int activeBrokerPortNumber = _clusterCreator.getBrokerPortNumberFromConnection(activeConnection); + + ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(activeBrokerPortNumber); + assertEquals("Unexpected store state", MASTER.toString(), storeBean.getNodeState()); + } + + public void testStateOfNonActiveBrokerIsNotMaster() throws Exception + { + final Connection activeConnection = getConnection(_brokerFailoverUrl); + final int inactiveBrokerPortNumber = _clusterCreator.getPortNumberOfAnInactiveBroker(activeConnection); + ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(inactiveBrokerPortNumber); + final String nodeState = storeBean.getNodeState(); + assertTrue("Unexpected store state : " + nodeState, NON_MASTER_STATES.contains(nodeState)); + } + + public void testGroupMembers() throws Exception + { + final int brokerPortNumber = getBrokerPortNumbers().iterator().next(); + + ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(brokerPortNumber); + awaitAllNodesJoiningGroup(storeBean, NUMBER_OF_NODES); + + final TabularData groupMembers = storeBean.getAllNodesInGroup(); + assertNotNull(groupMembers); + + for(int bdbPortNumber : _clusterCreator.getBdbPortNumbers()) + { + final String nodeName = _clusterCreator.getNodeNameForNodeAt(bdbPortNumber); + final String nodeHostPort = _clusterCreator.getNodeHostPortForNodeAt(bdbPortNumber); + + CompositeData row = groupMembers.get(new Object[] {nodeName}); + assertNotNull("Table does not contain row for node name " + nodeName, row); + assertEquals(nodeHostPort, row.get(ManagedBDBHAMessageStore.GRP_MEM_COL_NODE_HOST_PORT)); + } + } + + public void testRemoveRemoteNodeFromGroup() throws Exception + { + final Iterator<Integer> brokerPortNumberIterator = getBrokerPortNumbers().iterator(); + final int brokerPortNumberToMakeObservation = brokerPortNumberIterator.next(); + final int brokerPortNumberToBeRemoved = brokerPortNumberIterator.next(); + final ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(brokerPortNumberToMakeObservation); + awaitAllNodesJoiningGroup(storeBean, NUMBER_OF_NODES); + + final String removedNodeName = _clusterCreator.getNodeNameForNodeAt(_clusterCreator.getBdbPortForBrokerPort(brokerPortNumberToBeRemoved)); + _clusterCreator.stopNode(brokerPortNumberToBeRemoved); + + storeBean.removeNodeFromGroup(removedNodeName); + + long limitTime = System.currentTimeMillis() + 5000; + while((NUMBER_OF_NODES == storeBean.getAllNodesInGroup().size()) && System.currentTimeMillis() < limitTime) + { + Thread.sleep(100l); + } + + int numberOfDataRowsAfterRemoval = storeBean.getAllNodesInGroup().size(); + assertEquals("Unexpected number of data rows after test", NUMBER_OF_NODES - 1, numberOfDataRowsAfterRemoval); + } + + public void testVirtualHostOperationsDeniedForNonMasterNode() throws Exception + { + final Connection activeConnection = getConnection(_brokerFailoverUrl); + final int inactiveBrokerPortNumber = _clusterCreator.getPortNumberOfAnInactiveBroker(activeConnection); + + ManagedBroker inactiveBroker = getManagedBrokerBeanForNodeAtBrokerPort(inactiveBrokerPortNumber); + + try + { + inactiveBroker.createNewQueue(getTestQueueName(), null, true); + fail("Exception not thrown"); + } + catch (Exception e) + { + String message = e.getMessage(); + assertEquals("The virtual host state of UNAVAILABLE does not permit this operation.", message); + } + + try + { + inactiveBroker.createNewExchange(getName(), "direct", true); + fail("Exception not thrown"); + } + catch (Exception e) + { + String message = e.getMessage(); + assertEquals("The virtual host state of UNAVAILABLE does not permit this operation.", message); + } + } + + public void testSetDesignatedPrimary() throws Exception + { + int brokerPort = _clusterCreator.getBrokerPortNumbersForNodes().iterator().next(); + final ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(brokerPort); + assertFalse("Unexpected designated primary before change", storeBean.getDesignatedPrimary()); + storeBean.setDesignatedPrimary(true); + long limit = System.currentTimeMillis() + 5000; + while(!storeBean.getDesignatedPrimary() && System.currentTimeMillis() < limit) + { + Thread.sleep(100l); + } + assertTrue("Unexpected designated primary after change", storeBean.getDesignatedPrimary()); + } + + public void testVirtualHostMbeanOnMasterTransfer() throws Exception + { + Connection connection = getConnection(_brokerFailoverUrl); + int activeBrokerPort = _clusterCreator.getBrokerPortNumberFromConnection(connection); + LOGGER.info("Active connection port " + activeBrokerPort); + connection.close(); + + Set<Integer> ports = _clusterCreator.getBrokerPortNumbersForNodes(); + ports.remove(activeBrokerPort); + + int inactiveBrokerPort = ports.iterator().next(); + LOGGER.info("Update role attribute on inactive broker on port " + inactiveBrokerPort); + + ManagedBroker inactiveVirtualHostMBean = getManagedBrokerBeanForNodeAtBrokerPort(inactiveBrokerPort); + + try + { + inactiveVirtualHostMBean.createNewQueue(getTestQueueName(), null, true); + fail("Exception not thrown"); + } + catch (Exception e) + { + String message = e.getMessage(); + assertEquals("The virtual host state of UNAVAILABLE does not permit this operation.", message); + } + + Map<String, Object> attributes = _clusterCreator.getNodeAttributes(inactiveBrokerPort); + assertEquals("Inactive broker has unexpected role", "REPLICA", attributes.get(BDBHAVirtualHostNode.ROLE)); + _clusterCreator.setNodeAttributes(inactiveBrokerPort, Collections.<String, Object>singletonMap(BDBHAVirtualHostNode.ROLE, "MASTER")); + + _clusterCreator.awaitNodeToAttainRole(inactiveBrokerPort, "MASTER"); + + awaitVirtualHostAtNode(inactiveBrokerPort); + + ManagedBroker activeVirtualHostMBean = getManagedBrokerBeanForNodeAtBrokerPort(inactiveBrokerPort); + activeVirtualHostMBean.createNewQueue(getTestQueueName() + inactiveBrokerPort, null, true); + } + + public void awaitVirtualHostAtNode(int brokerPort) throws Exception + { + final long startTime = System.currentTimeMillis(); + Map<String, Object> data = Collections.emptyMap(); + String nodeName = _clusterCreator.getNodeNameForBrokerPort(brokerPort); + RestTestHelper restHelper = _clusterCreator.createRestTestHelper(brokerPort); + while(!State.ACTIVE.name().equals(data.get(VirtualHost.STATE)) && (System.currentTimeMillis() - startTime) < 30000) + { + LOGGER.debug("Awaiting virtual host '" + nodeName + "' to transit into active state"); + List<Map<String, Object>> results= restHelper.getJsonAsList("virtualhost/" + nodeName + "/" + VIRTUAL_HOST); + if (results.size()== 1) + { + data = results.get(0); + } + + if (!State.ACTIVE.name().equals(data.get(VirtualHost.STATE))) + { + Thread.sleep(1000); + } + } + Assert.assertEquals("Virtual host is not active", State.ACTIVE.name(), data.get(VirtualHost.STATE)); + LOGGER.debug("Virtual host '" + nodeName + "' is in active state"); + } + + private ManagedBDBHAMessageStore getStoreBeanForNodeAtBrokerPort(final int brokerPortNumber) throws Exception + { + _jmxUtils.open(brokerPortNumber); + + return _jmxUtils.getManagedObject(ManagedBDBHAMessageStore.class, MANAGED_OBJECT_QUERY); + } + + private ManagedBroker getManagedBrokerBeanForNodeAtBrokerPort(final int brokerPortNumber) throws Exception + { + _jmxUtils.open(brokerPortNumber); + + return _jmxUtils.getManagedBroker(VIRTUAL_HOST); + } + + private void awaitAllNodesJoiningGroup(ManagedBDBHAMessageStore storeBean, int expectedNumberOfNodes) throws Exception + { + long totalTimeWaited = 0l; + long waitInterval = 100l; + long maxWaitTime = 10000; + + int currentNumberOfNodes = storeBean.getAllNodesInGroup().size(); + while (expectedNumberOfNodes > currentNumberOfNodes || totalTimeWaited > maxWaitTime) + { + LOGGER.debug("Still awaiting nodes to join group; expecting " + + expectedNumberOfNodes + " node(s) but only have " + currentNumberOfNodes + + " after " + totalTimeWaited + " ms."); + + totalTimeWaited += waitInterval; + Thread.sleep(waitInterval); + + currentNumberOfNodes = storeBean.getAllNodesInGroup().size(); + } + + assertEquals("Unexpected number of nodes in group after " + totalTimeWaited + " ms", + expectedNumberOfNodes ,currentNumberOfNodes); + } +} diff --git a/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/MultiNodeTest.java b/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/MultiNodeTest.java new file mode 100644 index 0000000000..d6ba419de1 --- /dev/null +++ b/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/MultiNodeTest.java @@ -0,0 +1,371 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb.replication; + +import java.io.File; +import java.util.Collections; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.Session; + +import org.apache.log4j.Logger; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.jms.ConnectionListener; +import org.apache.qpid.jms.ConnectionURL; +import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHAVirtualHostNode; +import org.apache.qpid.test.utils.QpidBrokerTestCase; +import org.apache.qpid.test.utils.TestUtils; + +/** + * The HA black box tests test the BDB cluster as a opaque unit. Client connects to + * the cluster via a failover url + */ +public class MultiNodeTest extends QpidBrokerTestCase +{ + protected static final Logger LOGGER = Logger.getLogger(MultiNodeTest.class); + + private static final String VIRTUAL_HOST = "test"; + private static final int NUMBER_OF_NODES = 3; + + private final GroupCreator _groupCreator = new GroupCreator(this, VIRTUAL_HOST, NUMBER_OF_NODES); + + private FailoverAwaitingListener _failoverListener; + + /** Used when expectation is client will (re)-connect */ + private ConnectionURL _positiveFailoverUrl; + + /** Used when expectation is client will not (re)-connect */ + private ConnectionURL _negativeFailoverUrl; + + @Override + protected void setUp() throws Exception + { + _brokerType = BrokerType.SPAWNED; + + assertTrue(isJavaBroker()); + assertTrue(isBrokerStorePersistent()); + + setSystemProperty("java.util.logging.config.file", "etc" + File.separator + "log.properties"); + + _groupCreator.configureClusterNodes(); + + _positiveFailoverUrl = _groupCreator.getConnectionUrlForAllClusterNodes(); + _negativeFailoverUrl = _groupCreator.getConnectionUrlForAllClusterNodes(200, 0, 2); + + _groupCreator.startCluster(); + _failoverListener = new FailoverAwaitingListener(); + + super.setUp(); + } + + @Override + public void startBroker() throws Exception + { + // Don't start default broker provided by QBTC. + } + + public void testLossOfMasterNodeCausesClientToFailover() throws Exception + { + final Connection connection = getConnection(_positiveFailoverUrl); + + ((AMQConnection)connection).setConnectionListener(_failoverListener); + + final int activeBrokerPort = _groupCreator.getBrokerPortNumberFromConnection(connection); + LOGGER.info("Active connection port " + activeBrokerPort); + + _groupCreator.stopNode(activeBrokerPort); + LOGGER.info("Node is stopped"); + _failoverListener.awaitFailoverCompletion(20000); + LOGGER.info("Listener has finished"); + // any op to ensure connection remains + connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + } + + public void testLossOfReplicaNodeDoesNotCauseClientToFailover() throws Exception + { + final Connection connection = getConnection(_positiveFailoverUrl); + + ((AMQConnection)connection).setConnectionListener(_failoverListener); + final int activeBrokerPort = _groupCreator.getBrokerPortNumberFromConnection(connection); + LOGGER.info("Active connection port " + activeBrokerPort); + final int inactiveBrokerPort = _groupCreator.getPortNumberOfAnInactiveBroker(connection); + + LOGGER.info("Stopping inactive broker on port " + inactiveBrokerPort); + + _groupCreator.stopNode(inactiveBrokerPort); + + _failoverListener.assertNoFailoverCompletionWithin(2000); + + assertProducingConsuming(connection); + } + + public void testLossOfQuorumCausesClientDisconnection() throws Exception + { + final Connection connection = getConnection(_negativeFailoverUrl); + + ((AMQConnection)connection).setConnectionListener(_failoverListener); + + Set<Integer> ports = _groupCreator.getBrokerPortNumbersForNodes(); + + final int activeBrokerPort = _groupCreator.getBrokerPortNumberFromConnection(connection); + ports.remove(activeBrokerPort); + + // Stop all other nodes + for (Integer p : ports) + { + _groupCreator.stopNode(p); + } + + try + { + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Destination destination = session.createQueue(getTestQueueName()); + session.createConsumer(destination).close(); + fail("Exception not thrown - creating durable queue should fail without quorum"); + } + catch(JMSException jms) + { + // PASS + } + + // New connections should now fail as vhost will be unavailable + try + { + getConnection(_negativeFailoverUrl); + fail("Exception not thrown"); + } + catch (JMSException je) + { + // PASS + } + } + + public void testPersistentMessagesAvailableAfterFailover() throws Exception + { + final Connection connection = getConnection(_positiveFailoverUrl); + + ((AMQConnection)connection).setConnectionListener(_failoverListener); + + final int activeBrokerPort = _groupCreator.getBrokerPortNumberFromConnection(connection); + + Session producingSession = connection.createSession(true, Session.SESSION_TRANSACTED); + Destination queue = producingSession.createQueue(getTestQueueName()); + producingSession.createConsumer(queue).close(); + sendMessage(producingSession, queue, 10); + + _groupCreator.stopNode(activeBrokerPort); + LOGGER.info("Old master (broker port " + activeBrokerPort + ") is stopped"); + + _failoverListener.awaitFailoverCompletion(20000); + LOGGER.info("Failover has finished"); + + final int activeBrokerPortAfterFailover = _groupCreator.getBrokerPortNumberFromConnection(connection); + LOGGER.info("New master (broker port " + activeBrokerPort + ") after failover"); + + Session consumingSession = connection.createSession(true, Session.SESSION_TRANSACTED); + MessageConsumer consumer = consumingSession.createConsumer(queue); + + connection.start(); + for(int i = 0; i < 10; i++) + { + Message m = consumer.receive(RECEIVE_TIMEOUT); + assertNotNull("Message " + i + " is not received", m); + assertEquals("Unexpected message received", i, m.getIntProperty(INDEX)); + } + consumingSession.commit(); + } + + public void testTransferMasterFromLocalNode() throws Exception + { + final Connection connection = getConnection(_positiveFailoverUrl); + + ((AMQConnection)connection).setConnectionListener(_failoverListener); + + final int activeBrokerPort = _groupCreator.getBrokerPortNumberFromConnection(connection); + LOGGER.info("Active connection port " + activeBrokerPort); + + final int inactiveBrokerPort = _groupCreator.getPortNumberOfAnInactiveBroker(connection); + LOGGER.info("Update role attribute on inactive broker on port " + inactiveBrokerPort); + + Map<String, Object> attributes = _groupCreator.getNodeAttributes(inactiveBrokerPort); + assertEquals("Inactive broker has unexpected role", "REPLICA", attributes.get(BDBHAVirtualHostNode.ROLE)); + _groupCreator.setNodeAttributes(inactiveBrokerPort, + Collections.<String, Object>singletonMap(BDBHAVirtualHostNode.ROLE, "MASTER")); + + _failoverListener.awaitFailoverCompletion(20000); + LOGGER.info("Listener has finished"); + + attributes = _groupCreator.getNodeAttributes(inactiveBrokerPort); + assertEquals("Inactive broker has unexpected role", "MASTER", attributes.get(BDBHAVirtualHostNode.ROLE)); + + assertProducingConsuming(connection); + + _groupCreator.awaitNodeToAttainRole(activeBrokerPort, "REPLICA"); + } + + public void testTransferMasterFromRemoteNode() throws Exception + { + final Connection connection = getConnection(_positiveFailoverUrl); + + ((AMQConnection)connection).setConnectionListener(_failoverListener); + + final int activeBrokerPort = _groupCreator.getBrokerPortNumberFromConnection(connection); + LOGGER.info("Active connection port " + activeBrokerPort); + + final int inactiveBrokerPort = _groupCreator.getPortNumberOfAnInactiveBroker(connection); + LOGGER.info("Update role attribute on inactive broker on port " + inactiveBrokerPort); + + _groupCreator.awaitNodeToAttainRole(activeBrokerPort, inactiveBrokerPort, "REPLICA"); + Map<String, Object> attributes = _groupCreator.getNodeAttributes(activeBrokerPort, inactiveBrokerPort); + assertEquals("Inactive broker has unexpected role", "REPLICA", attributes.get(BDBHAVirtualHostNode.ROLE)); + + _groupCreator.setNodeAttributes(activeBrokerPort, inactiveBrokerPort, Collections.<String, Object>singletonMap(BDBHAVirtualHostNode.ROLE, "MASTER")); + + _failoverListener.awaitFailoverCompletion(20000); + LOGGER.info("Listener has finished"); + + attributes = _groupCreator.getNodeAttributes(inactiveBrokerPort); + assertEquals("Inactive broker has unexpected role", "MASTER", attributes.get(BDBHAVirtualHostNode.ROLE)); + + assertProducingConsuming(connection); + + _groupCreator.awaitNodeToAttainRole(activeBrokerPort, "REPLICA"); + } + + public void testQuorumOverride() throws Exception + { + final Connection connection = getConnection(_positiveFailoverUrl); + + Set<Integer> ports = _groupCreator.getBrokerPortNumbersForNodes(); + + final int activeBrokerPort = _groupCreator.getBrokerPortNumberFromConnection(connection); + ports.remove(activeBrokerPort); + + // Stop all other nodes + for (Integer p : ports) + { + _groupCreator.stopNode(p); + } + + Map<String, Object> attributes = _groupCreator.getNodeAttributes(activeBrokerPort); + assertEquals("Broker has unexpected quorum override", new Integer(0), attributes.get(BDBHAVirtualHostNode.QUORUM_OVERRIDE)); + _groupCreator.setNodeAttributes(activeBrokerPort, Collections.<String, Object>singletonMap(BDBHAVirtualHostNode.QUORUM_OVERRIDE, 1)); + + attributes = _groupCreator.getNodeAttributes(activeBrokerPort); + assertEquals("Broker has unexpected quorum override", new Integer(1), attributes.get(BDBHAVirtualHostNode.QUORUM_OVERRIDE)); + + assertProducingConsuming(connection); + } + + public void testPriority() throws Exception + { + final Connection connection = getConnection(_positiveFailoverUrl); + + ((AMQConnection)connection).setConnectionListener(_failoverListener); + + final int activeBrokerPort = _groupCreator.getBrokerPortNumberFromConnection(connection); + LOGGER.info("Active connection port " + activeBrokerPort); + + int priority = 1; + Integer highestPriorityBrokerPort = null; + Set<Integer> ports = _groupCreator.getBrokerPortNumbersForNodes(); + for (Integer port : ports) + { + if (activeBrokerPort != port.intValue()) + { + priority = priority + 1; + highestPriorityBrokerPort = port; + _groupCreator.setNodeAttributes(port, port, Collections.<String, Object>singletonMap(BDBHAVirtualHostNode.PRIORITY, priority)); + Map<String, Object> attributes = _groupCreator.getNodeAttributes(port, port); + assertEquals("Broker has unexpected priority", priority, attributes.get(BDBHAVirtualHostNode.PRIORITY)); + } + } + + LOGGER.info("Broker on port " + highestPriorityBrokerPort + " has the highest priority of " + priority); + + LOGGER.info("Shutting down the MASTER"); + _groupCreator.stopNode(activeBrokerPort); + + _failoverListener.awaitFailoverCompletion(20000); + LOGGER.info("Listener has finished"); + + Map<String, Object> attributes = _groupCreator.getNodeAttributes(highestPriorityBrokerPort, highestPriorityBrokerPort); + assertEquals("Inactive broker has unexpected role", "MASTER", attributes.get(BDBHAVirtualHostNode.ROLE)); + + assertProducingConsuming(connection); + } + + private final class FailoverAwaitingListener implements ConnectionListener + { + private final CountDownLatch _failoverCompletionLatch = new CountDownLatch(1); + + @Override + public boolean preResubscribe() + { + return true; + } + + @Override + public boolean preFailover(boolean redirect) + { + return true; + } + + public void awaitFailoverCompletion(long delay) throws InterruptedException + { + if (!_failoverCompletionLatch.await(delay, TimeUnit.MILLISECONDS)) + { + LOGGER.warn("Test thread dump:\n\n" + TestUtils.dumpThreads() + "\n"); + } + assertEquals("Failover did not occur", 0, _failoverCompletionLatch.getCount()); + } + + public void assertNoFailoverCompletionWithin(long delay) throws InterruptedException + { + _failoverCompletionLatch.await(delay, TimeUnit.MILLISECONDS); + assertEquals("Failover occurred unexpectedly", 1L, _failoverCompletionLatch.getCount()); + } + + @Override + public void failoverComplete() + { + _failoverCompletionLatch.countDown(); + } + + @Override + public void bytesSent(long count) + { + } + + @Override + public void bytesReceived(long count) + { + } + } + +} diff --git a/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/TwoNodeTest.java b/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/TwoNodeTest.java new file mode 100644 index 0000000000..0f8a1609de --- /dev/null +++ b/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/TwoNodeTest.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb.replication; + +import java.io.File; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.management.ObjectName; + +import org.apache.qpid.jms.ConnectionURL; +import org.apache.qpid.server.store.berkeleydb.jmx.ManagedBDBHAMessageStore; +import org.apache.qpid.test.utils.JMXTestUtils; +import org.apache.qpid.test.utils.QpidBrokerTestCase; + +public class TwoNodeTest extends QpidBrokerTestCase +{ + private static final String VIRTUAL_HOST = "test"; + + private static final String MANAGED_OBJECT_QUERY = "org.apache.qpid:type=BDBHAMessageStore,name=" + ObjectName.quote(VIRTUAL_HOST); + private static final int NUMBER_OF_NODES = 2; + + private final GroupCreator _groupCreator = new GroupCreator(this, VIRTUAL_HOST, NUMBER_OF_NODES); + private final JMXTestUtils _jmxUtils = new JMXTestUtils(this); + + private ConnectionURL _brokerFailoverUrl; + + @Override + protected void setUp() throws Exception + { + _brokerType = BrokerType.SPAWNED; + + assertTrue(isJavaBroker()); + assertTrue(isBrokerStorePersistent()); + + super.setUp(); + } + + @Override + protected void tearDown() throws Exception + { + try + { + _jmxUtils.close(); + } + finally + { + super.tearDown(); + } + } + + @Override + public void startBroker() throws Exception + { + // Don't start default broker provided by QBTC. + } + + private void startCluster(boolean designedPrimary) throws Exception + { + setSystemProperty("java.util.logging.config.file", "etc" + File.separator + "log.properties"); + _groupCreator.configureClusterNodes(); + _groupCreator.setDesignatedPrimaryOnFirstBroker(designedPrimary); + _brokerFailoverUrl = _groupCreator.getConnectionUrlForAllClusterNodes(); + _groupCreator.startCluster(); + } + + public void testMasterDesignatedPrimaryCanBeRestartedWithoutReplica() throws Exception + { + startCluster(true); + final Connection initialConnection = getConnection(_brokerFailoverUrl); + int masterPort = _groupCreator.getBrokerPortNumberFromConnection(initialConnection); + assertProducingConsuming(initialConnection); + initialConnection.close(); + _groupCreator.stopCluster(); + _groupCreator.startNode(masterPort); + final Connection secondConnection = getConnection(_brokerFailoverUrl); + assertProducingConsuming(secondConnection); + secondConnection.close(); + } + + public void testClusterRestartWithoutDesignatedPrimary() throws Exception + { + startCluster(false); + final Connection initialConnection = getConnection(_brokerFailoverUrl); + assertProducingConsuming(initialConnection); + initialConnection.close(); + _groupCreator.stopCluster(); + _groupCreator.startClusterParallel(); + final Connection secondConnection = getConnection(_brokerFailoverUrl); + assertProducingConsuming(secondConnection); + secondConnection.close(); + } + + public void testDesignatedPrimaryContinuesAfterSecondaryStopped() throws Exception + { + startCluster(true); + _groupCreator.stopNode(_groupCreator.getBrokerPortNumberOfSecondaryNode()); + final Connection connection = getConnection(_brokerFailoverUrl); + assertNotNull("Expected to get a valid connection to primary", connection); + assertProducingConsuming(connection); + } + + public void testPersistentOperationsFailOnNonDesignatedPrimaryAfterSecondaryStopped() throws Exception + { + startCluster(false); + _groupCreator.stopNode(_groupCreator.getBrokerPortNumberOfSecondaryNode()); + + try + { + Connection connection = getConnection(_brokerFailoverUrl); + assertProducingConsuming(connection); + fail("Exception not thrown"); + } + catch(JMSException e) + { + // JMSException should be thrown either on getConnection, or produce/consume + // depending on whether the relative timing of the node discovering that the + // secondary has gone. + } + } + + public void testSecondaryDoesNotBecomePrimaryWhenDesignatedPrimaryStopped() throws Exception + { + startCluster(true); + _groupCreator.stopNode(_groupCreator.getBrokerPortNumberOfPrimary()); + + try + { + getConnection(_brokerFailoverUrl); + fail("Connection not expected"); + } + catch (JMSException e) + { + // PASS + } + } + + public void testInitialDesignatedPrimaryStateOfNodes() throws Exception + { + startCluster(true); + final ManagedBDBHAMessageStore primaryStoreBean = getStoreBeanForNodeAtBrokerPort(_groupCreator.getBrokerPortNumberOfPrimary()); + assertTrue("Expected primary node to be set as designated primary", primaryStoreBean.getDesignatedPrimary()); + + final ManagedBDBHAMessageStore secondaryStoreBean = getStoreBeanForNodeAtBrokerPort(_groupCreator.getBrokerPortNumberOfSecondaryNode()); + assertFalse("Expected secondary node to NOT be set as designated primary", secondaryStoreBean.getDesignatedPrimary()); + } + + public void testSecondaryDesignatedAsPrimaryAfterOriginalPrimaryStopped() throws Exception + { + startCluster(true); + final ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(_groupCreator.getBrokerPortNumberOfSecondaryNode()); + _groupCreator.stopNode(_groupCreator.getBrokerPortNumberOfPrimary()); + + assertFalse("Expected node to NOT be set as designated primary", storeBean.getDesignatedPrimary()); + storeBean.setDesignatedPrimary(true); + + long limit = System.currentTimeMillis() + 5000; + while( !storeBean.getDesignatedPrimary() && System.currentTimeMillis() < limit) + { + Thread.sleep(100); + } + assertTrue("Expected node to now be set as designated primary", storeBean.getDesignatedPrimary()); + + final Connection connection = getConnection(_brokerFailoverUrl); + assertNotNull("Expected to get a valid connection to new primary", connection); + assertProducingConsuming(connection); + } + + private ManagedBDBHAMessageStore getStoreBeanForNodeAtBrokerPort( + final int activeBrokerPortNumber) throws Exception + { + _jmxUtils.open(activeBrokerPortNumber); + + ManagedBDBHAMessageStore storeBean = _jmxUtils.getManagedObject(ManagedBDBHAMessageStore.class, MANAGED_OBJECT_QUERY); + return storeBean; + } + +} |