summaryrefslogtreecommitdiff
path: root/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication')
-rw-r--r--qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/BDBHAVirtualHostNodeRestTest.java455
-rw-r--r--qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/BDBHAVirtualHostRestTest.java156
-rw-r--r--qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/GroupCreator.java535
-rw-r--r--qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/JMXManagementTest.java321
-rw-r--r--qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/MultiNodeTest.java371
-rw-r--r--qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/TwoNodeTest.java195
6 files changed, 2033 insertions, 0 deletions
diff --git a/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/BDBHAVirtualHostNodeRestTest.java b/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/BDBHAVirtualHostNodeRestTest.java
new file mode 100644
index 0000000000..301375d0fb
--- /dev/null
+++ b/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/BDBHAVirtualHostNodeRestTest.java
@@ -0,0 +1,455 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.replication;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.servlet.http.HttpServletResponse;
+
+import com.sleepycat.je.Durability;
+import com.sleepycat.je.EnvironmentConfig;
+import com.sleepycat.je.rep.ReplicatedEnvironment;
+import com.sleepycat.je.rep.ReplicationConfig;
+import org.apache.qpid.server.model.RemoteReplicationNode;
+import org.apache.qpid.server.model.State;
+import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.model.VirtualHostNode;
+import org.apache.qpid.server.virtualhost.berkeleydb.BDBHAVirtualHost;
+import org.apache.qpid.server.virtualhostnode.AbstractVirtualHostNode;
+import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHARemoteReplicationNode;
+import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHAVirtualHostNode;
+import org.apache.qpid.systest.rest.Asserts;
+import org.apache.qpid.systest.rest.QpidRestTestCase;
+import org.apache.qpid.test.utils.TestBrokerConfiguration;
+
+public class BDBHAVirtualHostNodeRestTest extends QpidRestTestCase
+{
+ private static final String NODE1 = "node1";
+ private static final String NODE2 = "node2";
+ private static final String NODE3 = "node3";
+
+ private int _node1HaPort;
+ private int _node2HaPort;
+ private int _node3HaPort;
+
+ private String _hostName;
+ private String _baseNodeRestUrl;
+
+ @Override
+ public void setUp() throws Exception
+ {
+ setTestSystemProperty(ReplicatedEnvironmentFacade.REMOTE_NODE_MONITOR_INTERVAL_PROPERTY_NAME, "1000");
+
+ super.setUp();
+ _hostName = getTestName();
+ _baseNodeRestUrl = "virtualhostnode/";
+
+ _node1HaPort = findFreePort();
+ _node2HaPort = getNextAvailable(_node1HaPort + 1);
+ _node3HaPort = getNextAvailable(_node2HaPort + 1);
+
+
+ }
+
+ @Override
+ protected void customizeConfiguration() throws IOException
+ {
+ super.customizeConfiguration();
+ TestBrokerConfiguration config = getBrokerConfiguration();
+ config.removeObjectConfiguration(VirtualHostNode.class, TEST2_VIRTUALHOST);
+ config.removeObjectConfiguration(VirtualHostNode.class, TEST3_VIRTUALHOST);
+ }
+
+ public void testCreate3NodeGroup() throws Exception
+ {
+ createHANode(NODE1, _node1HaPort, _node1HaPort);
+ assertNode(NODE1, _node1HaPort, _node1HaPort, NODE1);
+ createHANode(NODE2, _node2HaPort, _node1HaPort);
+ assertNode(NODE2, _node2HaPort, _node1HaPort, NODE1);
+ createHANode(NODE3, _node3HaPort, _node1HaPort);
+ assertNode(NODE3, _node3HaPort, _node1HaPort, NODE1);
+ assertRemoteNodes(NODE1, NODE2, NODE3);
+ }
+
+ public void testMutateStateOfOneNode() throws Exception
+ {
+ createHANode(NODE1, _node1HaPort, _node1HaPort);
+ createHANode(NODE2, _node2HaPort, _node1HaPort);
+ createHANode(NODE3, _node3HaPort, _node1HaPort);
+
+ String node1Url = _baseNodeRestUrl + NODE1;
+ String node2Url = _baseNodeRestUrl + NODE2;
+ String node3Url = _baseNodeRestUrl + NODE3;
+
+ assertActualAndDesiredStates(node1Url, "ACTIVE", "ACTIVE");
+ assertActualAndDesiredStates(node2Url, "ACTIVE", "ACTIVE");
+ assertActualAndDesiredStates(node3Url, "ACTIVE", "ACTIVE");
+
+ mutateDesiredState(node1Url, "STOPPED");
+
+ assertActualAndDesiredStates(node1Url, "STOPPED", "STOPPED");
+ assertActualAndDesiredStates(node2Url, "ACTIVE", "ACTIVE");
+ assertActualAndDesiredStates(node3Url, "ACTIVE", "ACTIVE");
+
+ List<Map<String, Object>> remoteNodes = getRestTestHelper().getJsonAsList("replicationnode/" + NODE2);
+ assertEquals("Unexpected number of remote nodes on " + NODE2, 2, remoteNodes.size());
+
+ Map<String, Object> remoteNode1 = findRemoteNodeByName(remoteNodes, NODE1);
+
+ assertEquals("Node 1 observed from node 2 is in the wrong state",
+ "UNAVAILABLE", remoteNode1.get(BDBHARemoteReplicationNode.STATE));
+ assertEquals("Node 1 observed from node 2 has the wrong role",
+ "UNKNOWN", remoteNode1.get(BDBHARemoteReplicationNode.ROLE));
+
+ }
+
+ public void testNewMasterElectedWhenVirtualHostIsStopped() throws Exception
+ {
+ createHANode(NODE1, _node1HaPort, _node1HaPort);
+ createHANode(NODE2, _node2HaPort, _node1HaPort);
+ createHANode(NODE3, _node3HaPort, _node1HaPort);
+
+ String node1Url = _baseNodeRestUrl + NODE1;
+ String node2Url = _baseNodeRestUrl + NODE2;
+ String node3Url = _baseNodeRestUrl + NODE3;
+
+ assertActualAndDesiredStates(node1Url, "ACTIVE", "ACTIVE");
+ assertActualAndDesiredStates(node2Url, "ACTIVE", "ACTIVE");
+ assertActualAndDesiredStates(node3Url, "ACTIVE", "ACTIVE");
+
+ // Put virtualhost in STOPPED state
+ String virtualHostRestUrl = "virtualhost/" + NODE1 + "/" + _hostName;
+ assertActualAndDesiredStates(virtualHostRestUrl, "ACTIVE", "ACTIVE");
+ mutateDesiredState(virtualHostRestUrl, "STOPPED");
+ assertActualAndDesiredStates(virtualHostRestUrl, "STOPPED", "STOPPED");
+
+ // Now stop node 1 to cause an election between nodes 2 & 3
+ mutateDesiredState(node1Url, "STOPPED");
+ assertActualAndDesiredStates(node1Url, "STOPPED", "STOPPED");
+
+ Map<String, Object> newMasterData = awaitNewMaster(node2Url, node3Url);
+
+ //Check the virtual host of the new master is in the stopped state
+ String newMasterVirtualHostRestUrl = "virtualhost/" + newMasterData.get(BDBHAVirtualHostNode.NAME) + "/" + _hostName;
+ assertActualAndDesiredStates(newMasterVirtualHostRestUrl, "STOPPED", "STOPPED");
+ }
+
+ public void testDeleteReplicaNode() throws Exception
+ {
+ createHANode(NODE1, _node1HaPort, _node1HaPort);
+ createHANode(NODE2, _node2HaPort, _node1HaPort);
+ createHANode(NODE3, _node3HaPort, _node1HaPort);
+
+ assertRemoteNodes(NODE1, NODE2, NODE3);
+
+ List<Map<String,Object>> data = getRestTestHelper().getJsonAsList("replicationnode/" + NODE1);
+ assertEquals("Unexpected number of remote nodes on " + NODE1, 2, data.size());
+
+ int responseCode = getRestTestHelper().submitRequest(_baseNodeRestUrl + NODE2, "DELETE");
+ assertEquals("Unexpected response code on deletion of virtual host node " + NODE2, 200, responseCode);
+
+ int counter = 0;
+ while (data.size() != 1 && counter<50)
+ {
+ data = getRestTestHelper().getJsonAsList("replicationnode/" + NODE1);
+ if (data.size() != 1)
+ {
+ Thread.sleep(100l);
+ }
+ counter++;
+ }
+ assertEquals("Unexpected number of remote nodes on " + NODE1, 1, data.size());
+ }
+
+ public void testDeleteMasterNode() throws Exception
+ {
+ createHANode(NODE1, _node1HaPort, _node1HaPort);
+ createHANode(NODE2, _node2HaPort, _node1HaPort);
+ createHANode(NODE3, _node3HaPort, _node1HaPort);
+
+ assertNode(NODE1, _node1HaPort, _node1HaPort, NODE1);
+ assertRemoteNodes(NODE1, NODE2, NODE3);
+
+ // change priority to make Node2 a master
+ int responseCode = getRestTestHelper().submitRequest(_baseNodeRestUrl + NODE2, "PUT", Collections.<String,Object>singletonMap(BDBHAVirtualHostNode.PRIORITY, 100));
+ assertEquals("Unexpected response code on priority update of virtual host node " + NODE2, 200, responseCode);
+
+ List<Map<String,Object>> data = getRestTestHelper().getJsonAsList("replicationnode/" + NODE2);
+ assertEquals("Unexpected number of remote nodes on " + NODE2, 2, data.size());
+
+ // delete master
+ responseCode = getRestTestHelper().submitRequest(_baseNodeRestUrl + NODE1, "DELETE");
+ assertEquals("Unexpected response code on deletion of virtual host node " + NODE1, 200, responseCode);
+
+ // wait for new master
+ waitForAttributeChanged(_baseNodeRestUrl + NODE2 + "?depth=0", BDBHAVirtualHostNode.ROLE, "MASTER");
+
+ // delete remote node
+ responseCode = getRestTestHelper().submitRequest("replicationnode/" + NODE2 + "/" + NODE1, "DELETE");
+ assertEquals("Unexpected response code on deletion of remote node " + NODE1, 200, responseCode);
+
+ int counter = 0;
+ while (data.size() != 1 && counter<50)
+ {
+ data = getRestTestHelper().getJsonAsList("replicationnode/" + NODE2);
+ if (data.size() != 1)
+ {
+ Thread.sleep(100l);
+ }
+ counter++;
+ }
+ assertEquals("Unexpected number of remote nodes on " + NODE2, 1, data.size());
+ }
+
+ public void testIntruderBDBHAVHNNotAllowedNoConnect() throws Exception
+ {
+ createHANode(NODE1, _node1HaPort, _node1HaPort);
+ assertNode(NODE1, _node1HaPort, _node1HaPort, NODE1);
+
+ // add permitted node
+ Map<String, Object> node3Data = createNodeAttributeMap(NODE3, _node3HaPort, _node1HaPort);
+ getRestTestHelper().submitRequest(_baseNodeRestUrl + NODE3, "PUT", node3Data, 201);
+ assertNode(NODE3, _node3HaPort, _node1HaPort, NODE1);
+ assertRemoteNodes(NODE1, NODE3);
+
+ int intruderPort = getNextAvailable(_node3HaPort + 1);
+
+ // try to add not permitted node
+ Map<String, Object> nodeData = createNodeAttributeMap(NODE2, intruderPort, _node1HaPort);
+ getRestTestHelper().submitRequest(_baseNodeRestUrl + NODE2, "PUT", nodeData, 409);
+
+ assertRemoteNodes(NODE1, NODE3);
+ }
+
+ public void testIntruderProtection() throws Exception
+ {
+ createHANode(NODE1, _node1HaPort, _node1HaPort);
+ assertNode(NODE1, _node1HaPort, _node1HaPort, NODE1);
+
+ Map<String,Object> nodeData = getRestTestHelper().getJsonAsSingletonList(_baseNodeRestUrl + NODE1);
+ String node1StorePath = (String)nodeData.get(BDBHAVirtualHostNode.STORE_PATH);
+ long transactionId = ((Number)nodeData.get(BDBHAVirtualHostNode.LAST_KNOWN_REPLICATION_TRANSACTION_ID)).longValue();
+
+ // add permitted node
+ Map<String, Object> node3Data = createNodeAttributeMap(NODE3, _node3HaPort, _node1HaPort);
+ getRestTestHelper().submitRequest(_baseNodeRestUrl + NODE3, "PUT", node3Data, 201);
+ assertNode(NODE3, _node3HaPort, _node1HaPort, NODE1);
+ assertRemoteNodes(NODE1, NODE3);
+
+ // Ensure PINGDB is created
+ // in order to exclude hanging of environment
+ // when environment.close is called whilst PINGDB is created.
+ // On node joining, a record is updated in PINGDB
+ // if lastTransactionId is incremented then node ping task was executed
+ int counter = 0;
+ long newTransactionId = transactionId;
+ while(newTransactionId == transactionId && counter<50)
+ {
+ nodeData = getRestTestHelper().getJsonAsSingletonList(_baseNodeRestUrl + NODE1);
+ newTransactionId = ((Number)nodeData.get(BDBHAVirtualHostNode.LAST_KNOWN_REPLICATION_TRANSACTION_ID)).longValue();
+ if (newTransactionId != transactionId)
+ {
+ break;
+ }
+ counter++;
+ Thread.sleep(100l);
+ }
+
+ //connect intruder node
+ String nodeName = NODE2;
+ String nodeHostPort = "localhost:" + getNextAvailable(_node3HaPort + 1);
+ File environmentPathFile = new File(node1StorePath, nodeName);
+ environmentPathFile.mkdirs();
+ ReplicationConfig replicationConfig = new ReplicationConfig((String)nodeData.get(BDBHAVirtualHostNode.GROUP_NAME), nodeName, nodeHostPort);
+ replicationConfig.setHelperHosts((String)nodeData.get(BDBHAVirtualHostNode.ADDRESS));
+ EnvironmentConfig envConfig = new EnvironmentConfig();
+ envConfig.setAllowCreate(true);
+ envConfig.setTransactional(true);
+ envConfig.setDurability(Durability.parse((String)nodeData.get(BDBHAVirtualHostNode.DURABILITY)));
+
+ ReplicatedEnvironment intruder = null;
+ try
+ {
+ intruder = new ReplicatedEnvironment(environmentPathFile, replicationConfig, envConfig);
+ }
+ finally
+ {
+ if (intruder != null)
+ {
+ intruder.close();
+ }
+ }
+ waitForAttributeChanged(_baseNodeRestUrl + NODE1, VirtualHostNode.STATE, State.ERRORED.name());
+ waitForAttributeChanged(_baseNodeRestUrl + NODE3, VirtualHostNode.STATE, State.ERRORED.name());
+ }
+
+ private void createHANode(String nodeName, int nodePort, int helperPort) throws Exception
+ {
+ Map<String, Object> nodeData = createNodeAttributeMap(nodeName, nodePort, helperPort);
+
+ int responseCode = getRestTestHelper().submitRequest(_baseNodeRestUrl + nodeName, "PUT", nodeData);
+ assertEquals("Unexpected response code for virtual host node " + nodeName + " creation request", 201, responseCode);
+ String hostExpectedState = nodePort == helperPort ? State.ACTIVE.name(): State.UNAVAILABLE.name();
+ waitForAttributeChanged("virtualhost/" + nodeName + "/" + _hostName, BDBHAVirtualHost.STATE, hostExpectedState);
+ }
+
+ private Map<String, Object> createNodeAttributeMap(String nodeName, int nodePort, int helperPort) throws Exception
+ {
+ Map<String, Object> nodeData = new HashMap<String, Object>();
+ nodeData.put(BDBHAVirtualHostNode.NAME, nodeName);
+ nodeData.put(BDBHAVirtualHostNode.TYPE, "BDB_HA");
+ nodeData.put(BDBHAVirtualHostNode.GROUP_NAME, _hostName);
+ nodeData.put(BDBHAVirtualHostNode.ADDRESS, "localhost:" + nodePort);
+ nodeData.put(BDBHAVirtualHostNode.HELPER_ADDRESS, "localhost:" + helperPort);
+ nodeData.put(BDBHAVirtualHostNode.HELPER_NODE_NAME, NODE1);
+ Map<String,String> context = new HashMap<>();
+ nodeData.put(BDBHAVirtualHostNode.CONTEXT, context);
+ String bluePrint = GroupCreator.getBlueprint("localhost", _node1HaPort, _node2HaPort, _node3HaPort);
+ context.put(AbstractVirtualHostNode.VIRTUALHOST_BLUEPRINT_CONTEXT_VAR, bluePrint);
+ return nodeData;
+ }
+
+ private void assertNode(String nodeName, int nodePort, int nodeHelperPort, String masterNode) throws Exception
+ {
+ boolean isMaster = nodeName.equals(masterNode);
+ String expectedRole = isMaster? "MASTER" : "REPLICA";
+ waitForAttributeChanged(_baseNodeRestUrl + nodeName + "?depth=0", BDBHAVirtualHostNode.ROLE, expectedRole);
+
+ Map<String, Object> nodeData = getRestTestHelper().getJsonAsSingletonList(_baseNodeRestUrl + nodeName + "?depth=0");
+ assertEquals("Unexpected name", nodeName, nodeData.get(BDBHAVirtualHostNode.NAME));
+ assertEquals("Unexpected type", "BDB_HA", nodeData.get(BDBHAVirtualHostNode.TYPE));
+ assertEquals("Unexpected address", "localhost:" + nodePort, nodeData.get(BDBHAVirtualHostNode.ADDRESS));
+ assertEquals("Unexpected helper address", "localhost:" + nodeHelperPort, nodeData.get(BDBHAVirtualHostNode.HELPER_ADDRESS));
+ assertEquals("Unexpected group name", _hostName, nodeData.get(BDBHAVirtualHostNode.GROUP_NAME));
+ assertEquals("Unexpected role", expectedRole, nodeData.get(BDBHAVirtualHostNode.ROLE));
+
+ Integer lastKnownTransactionId = (Integer) nodeData.get(BDBHAVirtualHostNode.LAST_KNOWN_REPLICATION_TRANSACTION_ID);
+ assertNotNull("Unexpected lastKnownReplicationId", lastKnownTransactionId);
+ assertTrue("Unexpected lastKnownReplicationId " + lastKnownTransactionId, lastKnownTransactionId > 0);
+
+ Long joinTime = (Long) nodeData.get(BDBHAVirtualHostNode.JOIN_TIME);
+ assertNotNull("Unexpected joinTime", joinTime);
+ assertTrue("Unexpected joinTime " + joinTime, joinTime > 0);
+
+ if (isMaster)
+ {
+ waitForAttributeChanged("virtualhost/" + masterNode + "/" + _hostName + "?depth=0", VirtualHost.STATE, State.ACTIVE.name());
+ }
+
+ }
+
+ private void assertRemoteNodes(String masterNode, String... replicaNodes) throws Exception
+ {
+ List<String> clusterNodes = new ArrayList<String>(Arrays.asList(replicaNodes));
+ clusterNodes.add(masterNode);
+
+ for (String clusterNodeName : clusterNodes)
+ {
+ List<String> remotes = new ArrayList<String>(clusterNodes);
+ remotes.remove(clusterNodeName);
+ for (String remote : remotes)
+ {
+ String remoteUrl = "replicationnode/" + clusterNodeName + "/" + remote;
+ Map<String, Object> nodeData = waitForAttributeChanged(remoteUrl, BDBHARemoteReplicationNode.ROLE, remote.equals(masterNode) ? "MASTER" : "REPLICA");
+ assertRemoteNodeData(remote, nodeData);
+ }
+ }
+ }
+
+ private void assertRemoteNodeData(String name, Map<String, Object> nodeData)
+ {
+ assertEquals("Remote node " + name + " has unexpected name", name, nodeData.get(BDBHAVirtualHostNode.NAME));
+
+ Integer lastKnownTransactionId = (Integer) nodeData.get(BDBHAVirtualHostNode.LAST_KNOWN_REPLICATION_TRANSACTION_ID);
+ assertNotNull("Node " + name + " has unexpected lastKnownReplicationId", lastKnownTransactionId);
+ assertTrue("Node " + name + " has unexpected lastKnownReplicationId " + lastKnownTransactionId, lastKnownTransactionId > 0);
+
+ Long joinTime = (Long) nodeData.get(BDBHAVirtualHostNode.JOIN_TIME);
+ assertNotNull("Node " + name + " has unexpected joinTime", joinTime);
+ assertTrue("Node " + name + " has unexpected joinTime " + joinTime, joinTime > 0);
+ }
+
+ private void assertActualAndDesiredStates(final String restUrl,
+ final String expectedDesiredState,
+ final String expectedActualState) throws IOException
+ {
+ Map<String, Object> objectData = getRestTestHelper().getJsonAsSingletonList(restUrl);
+ Asserts.assertActualAndDesiredState(expectedDesiredState, expectedActualState, objectData);
+ }
+
+ private void mutateDesiredState(final String restUrl, final String newState) throws IOException
+ {
+ Map<String, Object> newAttributes = new HashMap<String, Object>();
+ newAttributes.put(VirtualHostNode.DESIRED_STATE, newState);
+
+ getRestTestHelper().submitRequest(restUrl, "PUT", newAttributes, HttpServletResponse.SC_OK);
+ }
+
+ private Map<String, Object> findRemoteNodeByName(final List<Map<String, Object>> remoteNodes, final String nodeName)
+ {
+ Map<String, Object> foundNode = null;
+ for (Map<String, Object> remoteNode : remoteNodes)
+ {
+ if (nodeName.equals(remoteNode.get(RemoteReplicationNode.NAME)))
+ {
+ foundNode = remoteNode;
+ break;
+ }
+ }
+ assertNotNull("Could not find node with name " + nodeName + " amongst remote nodes.");
+ return foundNode;
+ }
+
+ private Map<String, Object> awaitNewMaster(final String... nodeUrls)
+ throws IOException, InterruptedException
+ {
+ Map<String, Object> newMasterData = null;
+ int counter = 0;
+ while (newMasterData == null && counter < 50)
+ {
+ for(String nodeUrl: nodeUrls)
+ {
+ Map<String, Object> nodeData = getRestTestHelper().getJsonAsSingletonList(nodeUrl);
+ if ("MASTER".equals(nodeData.get(BDBHAVirtualHostNode.ROLE)))
+ {
+ newMasterData = nodeData;
+ break;
+ }
+ }
+ if (newMasterData == null)
+ {
+ Thread.sleep(100l);
+ counter++;
+ }
+ }
+ assertNotNull("Could not find new master", newMasterData);
+ return newMasterData;
+ }
+
+
+}
diff --git a/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/BDBHAVirtualHostRestTest.java b/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/BDBHAVirtualHostRestTest.java
new file mode 100644
index 0000000000..07ce033a55
--- /dev/null
+++ b/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/BDBHAVirtualHostRestTest.java
@@ -0,0 +1,156 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.replication;
+
+import static org.apache.qpid.server.virtualhost.berkeleydb.BDBHAVirtualHost.LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY;
+import static org.apache.qpid.server.virtualhost.berkeleydb.BDBHAVirtualHost.REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.qpid.server.model.State;
+import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.model.VirtualHostNode;
+import org.apache.qpid.server.virtualhostnode.AbstractVirtualHostNode;
+import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHAVirtualHostNode;
+import org.apache.qpid.systest.rest.Asserts;
+import org.apache.qpid.systest.rest.QpidRestTestCase;
+import org.apache.qpid.test.utils.TestBrokerConfiguration;
+import org.apache.qpid.util.FileUtils;
+
+public class BDBHAVirtualHostRestTest extends QpidRestTestCase
+{
+ private String _hostName;
+ private File _storeBaseDir;
+ private int _nodeHaPort;
+ private Object _nodeName;
+ private String _virtualhostUrl;
+ private String _bluePrint;
+
+ @Override
+ public void setUp() throws Exception
+ {
+ setTestSystemProperty(ReplicatedEnvironmentFacade.REMOTE_NODE_MONITOR_INTERVAL_PROPERTY_NAME, "1000");
+ _hostName = "ha";
+ _nodeName = "node1";
+ _storeBaseDir = new File(TMP_FOLDER, "store-" + _hostName + "-" + System.currentTimeMillis());
+ _nodeHaPort = getNextAvailable(getRestTestHelper().getHttpPort() + 1);
+ _virtualhostUrl = "virtualhost/" + _nodeName + "/" + _hostName;
+ _bluePrint = GroupCreator.getBlueprint("localhost", _nodeHaPort);
+
+ super.setUp();
+ }
+
+ @Override
+ public void tearDown() throws Exception
+ {
+ try
+ {
+ super.tearDown();
+ }
+ finally
+ {
+ if (_storeBaseDir != null)
+ {
+ FileUtils.delete(_storeBaseDir, true);
+ }
+ }
+ }
+
+ @Override
+ protected void customizeConfiguration() throws IOException
+ {
+ super.customizeConfiguration();
+ TestBrokerConfiguration config = getBrokerConfiguration();
+ config.removeObjectConfiguration(VirtualHostNode.class, TEST2_VIRTUALHOST);
+ config.removeObjectConfiguration(VirtualHostNode.class, TEST3_VIRTUALHOST);
+
+ Map<String, Object> nodeAttributes = new HashMap<String, Object>();
+ nodeAttributes.put(BDBHAVirtualHostNode.NAME, _nodeName);
+ nodeAttributes.put(BDBHAVirtualHostNode.TYPE, "BDB_HA");
+ nodeAttributes.put(BDBHAVirtualHostNode.STORE_PATH, _storeBaseDir.getPath() + File.separator + _nodeName);
+ nodeAttributes.put(BDBHAVirtualHostNode.GROUP_NAME, _hostName);
+ nodeAttributes.put(BDBHAVirtualHostNode.ADDRESS, "localhost:" + _nodeHaPort);
+ nodeAttributes.put(BDBHAVirtualHostNode.HELPER_ADDRESS, "localhost:" + _nodeHaPort);
+ nodeAttributes.put(BDBHAVirtualHostNode.HELPER_NODE_NAME, _nodeName);
+ Map<String, String> context = new HashMap<String,String>();
+ context.put(AbstractVirtualHostNode.VIRTUALHOST_BLUEPRINT_CONTEXT_VAR, _bluePrint);
+
+ nodeAttributes.put(BDBHAVirtualHostNode.CONTEXT, context);
+ config.addObjectConfiguration(VirtualHostNode.class, nodeAttributes);
+ }
+
+ public void testSetLocalTransactionSynchronizationPolicy() throws Exception
+ {
+ Map<String, Object> hostAttributes = waitForAttributeChanged(_virtualhostUrl, VirtualHost.STATE, State.ACTIVE.name());
+ assertEquals("Unexpected synchronization policy before change", "SYNC", hostAttributes.get(LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY));
+
+ Map<String, Object> newPolicy = Collections.<String, Object>singletonMap(LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY, "NO_SYNC");
+ getRestTestHelper().submitRequest(_virtualhostUrl, "PUT", newPolicy, HttpServletResponse.SC_OK);
+
+ hostAttributes = getRestTestHelper().getJsonAsSingletonList(_virtualhostUrl);
+ assertEquals("Unexpected synchronization policy after change", "NO_SYNC", hostAttributes.get(LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY));
+ }
+
+ public void testSetRemoteTransactionSynchronizationPolicy() throws Exception
+ {
+ Map<String, Object> hostAttributes = waitForAttributeChanged(_virtualhostUrl, VirtualHost.STATE, State.ACTIVE.name());
+ assertEquals("Unexpected synchronization policy before change", "NO_SYNC", hostAttributes.get(REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY));
+
+ Map<String, Object> newPolicy = Collections.<String, Object>singletonMap(REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY, "SYNC");
+ getRestTestHelper().submitRequest(_virtualhostUrl, "PUT", newPolicy, HttpServletResponse.SC_OK);
+
+ hostAttributes = getRestTestHelper().getJsonAsSingletonList(_virtualhostUrl);
+ assertEquals("Unexpected synchronization policy after change", "SYNC", hostAttributes.get(REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY));
+ }
+
+ public void testMutateState() throws Exception
+ {
+ waitForAttributeChanged(_virtualhostUrl, VirtualHost.STATE, "ACTIVE");
+ assertActualAndDesireStates(_virtualhostUrl, "ACTIVE", "ACTIVE");
+
+ Map<String, Object> newAttributes = Collections.<String, Object>singletonMap(VirtualHost.DESIRED_STATE, "STOPPED");
+ getRestTestHelper().submitRequest(_virtualhostUrl, "PUT", newAttributes, HttpServletResponse.SC_OK);
+
+ waitForAttributeChanged(_virtualhostUrl, VirtualHost.STATE, "STOPPED");
+ assertActualAndDesireStates(_virtualhostUrl, "STOPPED", "STOPPED");
+
+ newAttributes = Collections.<String, Object>singletonMap(VirtualHost.DESIRED_STATE, "ACTIVE");
+ getRestTestHelper().submitRequest(_virtualhostUrl, "PUT", newAttributes, HttpServletResponse.SC_OK);
+
+ waitForAttributeChanged(_virtualhostUrl, VirtualHost.STATE, "ACTIVE");
+ assertActualAndDesireStates(_virtualhostUrl, "ACTIVE", "ACTIVE");
+ }
+
+ private void assertActualAndDesireStates(final String restUrl,
+ final String expectedDesiredState,
+ final String expectedActualState) throws IOException
+ {
+ Map<String, Object> virtualhost = getRestTestHelper().getJsonAsSingletonList(restUrl);
+ Asserts.assertActualAndDesiredState(expectedDesiredState, expectedActualState, virtualhost);
+ }
+
+}
diff --git a/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/GroupCreator.java b/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/GroupCreator.java
new file mode 100644
index 0000000000..e78ef34759
--- /dev/null
+++ b/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/GroupCreator.java
@@ -0,0 +1,535 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.replication;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.StringWriter;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.log4j.Logger;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQConnectionURL;
+import org.apache.qpid.jms.ConnectionURL;
+import org.apache.qpid.server.management.plugin.HttpManagement;
+import org.apache.qpid.server.model.Plugin;
+import org.apache.qpid.server.model.Port;
+import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.model.VirtualHostNode;
+import org.apache.qpid.server.virtualhost.berkeleydb.BDBHAVirtualHost;
+import org.apache.qpid.server.virtualhost.berkeleydb.BDBHAVirtualHostImpl;
+import org.apache.qpid.server.virtualhostnode.AbstractVirtualHostNode;
+import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHARemoteReplicationNode;
+import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHAVirtualHostNode;
+import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHAVirtualHostNodeImpl;
+import org.apache.qpid.systest.rest.RestTestHelper;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.apache.qpid.test.utils.TestBrokerConfiguration;
+import org.apache.qpid.url.URLSyntaxException;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.SerializationConfig;
+import org.junit.Assert;
+
+import com.sleepycat.je.rep.ReplicationConfig;
+
+public class GroupCreator
+{
+ protected static final Logger LOGGER = Logger.getLogger(GroupCreator.class);
+
+ private static final String MANY_BROKER_URL_FORMAT = "amqp://guest:guest@/%s?brokerlist='%s'&failover='roundrobin?cyclecount='%d''";
+ private static final String BROKER_PORTION_FORMAT = "tcp://localhost:%d?connectdelay='%d',retries='%d'";
+
+ private static final int FAILOVER_CYCLECOUNT = 10;
+ private static final int FAILOVER_RETRIES = 1;
+ private static final int FAILOVER_CONNECTDELAY = 1000;
+
+ private static final String SINGLE_BROKER_URL_WITH_RETRY_FORMAT = "amqp://guest:guest@/%s?brokerlist='tcp://localhost:%d?connectdelay='%d',retries='%d''";
+ private static final String SINGLE_BROKER_URL_WITHOUT_RETRY_FORMAT = "amqp://guest:guest@/%s?brokerlist='tcp://localhost:%d'";
+
+ private static final int RETRIES = 60;
+ private static final int CONNECTDELAY = 75;
+
+ private final QpidBrokerTestCase _testcase;
+ private final Map<Integer, Integer> _brokerPortToBdbPortMap = new TreeMap<Integer, Integer>();
+ private final String _virtualHostName;
+
+ private final String _ipAddressOfBroker;
+ private final String _groupName ;
+ private final int _numberOfNodes;
+ private int _bdbHelperPort;
+ private int _primaryBrokerPort;
+
+ public GroupCreator(QpidBrokerTestCase testcase, String virtualHostName, int numberOfNodes)
+ {
+ _testcase = testcase;
+ _virtualHostName = virtualHostName;
+ _groupName = virtualHostName;
+ _ipAddressOfBroker = getIpAddressOfBrokerHost();
+ _numberOfNodes = numberOfNodes;
+ _bdbHelperPort = 0;
+ }
+
+ public void configureClusterNodes() throws Exception
+ {
+ int brokerPort = _testcase.findFreePort();
+
+ int[] bdbPorts = new int[_numberOfNodes];
+ for (int i = 0; i < _numberOfNodes; i++)
+ {
+ int bdbPort = _testcase.getNextAvailable(brokerPort + 1);
+ bdbPorts[i] = bdbPort;
+ _brokerPortToBdbPortMap.put(brokerPort, bdbPort);
+ brokerPort = _testcase.getNextAvailable(bdbPort + 1);
+ }
+
+ String bluePrintJson = getBlueprint(_ipAddressOfBroker, bdbPorts);
+
+ String helperName = null;
+ for (Map.Entry<Integer,Integer> entry: _brokerPortToBdbPortMap.entrySet())
+ {
+ brokerPort = entry.getKey();
+ int bdbPort = entry.getValue();
+ LOGGER.debug("Cluster broker port " + brokerPort + ", bdb replication port " + bdbPort);
+ if (_bdbHelperPort == 0)
+ {
+ _bdbHelperPort = bdbPort;
+ }
+
+ String nodeName = getNodeNameForNodeAt(bdbPort);
+ if (helperName == null)
+ {
+ helperName = nodeName;
+ }
+
+ Map<String, Object> virtualHostNodeAttributes = new HashMap<String, Object>();
+ virtualHostNodeAttributes.put(BDBHAVirtualHostNode.STORE_PATH, System.getProperty("QPID_WORK") + File.separator + brokerPort);
+ virtualHostNodeAttributes.put(BDBHAVirtualHostNode.GROUP_NAME, _groupName);
+ virtualHostNodeAttributes.put(BDBHAVirtualHostNode.NAME, nodeName);
+ virtualHostNodeAttributes.put(BDBHAVirtualHostNode.ADDRESS, getNodeHostPortForNodeAt(bdbPort));
+ virtualHostNodeAttributes.put(BDBHAVirtualHostNode.HELPER_ADDRESS, getHelperHostPort());
+ virtualHostNodeAttributes.put(BDBHAVirtualHostNode.TYPE, BDBHAVirtualHostNodeImpl.VIRTUAL_HOST_NODE_TYPE);
+ virtualHostNodeAttributes.put(BDBHAVirtualHostNode.HELPER_NODE_NAME, helperName);
+
+ Map<String, String> context = new HashMap<>();
+ context.put(ReplicationConfig.INSUFFICIENT_REPLICAS_TIMEOUT, "2 s");
+ context.put(ReplicationConfig.ELECTIONS_PRIMARY_RETRIES, "0");
+ context.put(AbstractVirtualHostNode.VIRTUALHOST_BLUEPRINT_CONTEXT_VAR, bluePrintJson);
+ virtualHostNodeAttributes.put(BDBHAVirtualHostNode.CONTEXT, context);
+
+ TestBrokerConfiguration brokerConfiguration = _testcase.getBrokerConfiguration(brokerPort);
+ brokerConfiguration.addJmxManagementConfiguration();
+ brokerConfiguration.addHttpManagementConfiguration();
+ brokerConfiguration.setObjectAttribute(Plugin.class, TestBrokerConfiguration.ENTRY_NAME_HTTP_MANAGEMENT, HttpManagement.HTTP_BASIC_AUTHENTICATION_ENABLED, true);
+ brokerConfiguration.setObjectAttribute(Port.class, TestBrokerConfiguration.ENTRY_NAME_HTTP_PORT, Port.PORT, _testcase.getHttpManagementPort(brokerPort));
+ brokerConfiguration.setObjectAttributes(VirtualHostNode.class, _virtualHostName, virtualHostNodeAttributes);
+
+ }
+ _primaryBrokerPort = getPrimaryBrokerPort();
+ }
+
+ public void setDesignatedPrimaryOnFirstBroker(boolean designatedPrimary) throws Exception
+ {
+ if (_numberOfNodes != 2)
+ {
+ throw new IllegalArgumentException("Only two nodes groups have the concept of primary");
+ }
+ TestBrokerConfiguration config = _testcase.getBrokerConfiguration(_primaryBrokerPort);
+ String nodeName = getNodeNameForNodeAt(_brokerPortToBdbPortMap.get(_primaryBrokerPort));
+ config.setObjectAttribute(VirtualHostNode.class, nodeName, BDBHAVirtualHostNode.DESIGNATED_PRIMARY, designatedPrimary);
+ config.setSaved(false);
+ }
+
+ private int getPrimaryBrokerPort()
+ {
+ return _brokerPortToBdbPortMap.keySet().iterator().next();
+ }
+
+ public void startNode(final int brokerPortNumber) throws Exception
+ {
+ _testcase.startBroker(brokerPortNumber);
+ }
+
+ public void startCluster() throws Exception
+ {
+ for (final Integer brokerPortNumber : _brokerPortToBdbPortMap.keySet())
+ {
+ startNode(brokerPortNumber);
+ }
+ }
+
+ public void startClusterParallel() throws Exception
+ {
+ final ExecutorService executor = Executors.newFixedThreadPool(_brokerPortToBdbPortMap.size());
+ try
+ {
+ List<Future<Object>> brokers = new CopyOnWriteArrayList<Future<Object>>();
+ for (final Integer brokerPortNumber : _brokerPortToBdbPortMap.keySet())
+ {
+ final TestBrokerConfiguration brokerConfig = _testcase.getBrokerConfiguration(brokerPortNumber);
+ Future<Object> future = executor.submit(new Callable<Object>()
+ {
+ public Object call()
+ {
+ try
+ {
+ _testcase.startBroker(brokerPortNumber, brokerConfig);
+ return "OK";
+ }
+ catch (Exception e)
+ {
+ return e;
+ }
+ }
+ });
+ brokers.add(future);
+ }
+ for (Future<Object> future : brokers)
+ {
+ Object result = future.get(30, TimeUnit.SECONDS);
+ LOGGER.debug("Node startup result:" + result);
+ if (result instanceof Exception)
+ {
+ throw (Exception) result;
+ }
+ else if (!"OK".equals(result))
+ {
+ throw new Exception("One of the cluster nodes is not started");
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ stopCluster();
+ throw e;
+ }
+ finally
+ {
+ executor.shutdown();
+ }
+
+ }
+
+ public void stopNode(final int brokerPortNumber)
+ {
+ _testcase.killBroker(brokerPortNumber);
+ }
+
+ public void stopCluster() throws Exception
+ {
+ for (final Integer brokerPortNumber : _brokerPortToBdbPortMap.keySet())
+ {
+ try
+ {
+ stopNode(brokerPortNumber);
+ }
+ catch(Exception e)
+ {
+ LOGGER.warn("Failed to stop node on port:" + brokerPortNumber);
+ }
+ }
+ }
+
+ public int getBrokerPortNumberFromConnection(Connection connection)
+ {
+ final AMQConnection amqConnection = (AMQConnection)connection;
+ return amqConnection.getActiveBrokerDetails().getPort();
+ }
+
+ public int getPortNumberOfAnInactiveBroker(final Connection activeConnection)
+ {
+ final Set<Integer> allBrokerPorts = _testcase.getBrokerPortNumbers();
+ LOGGER.debug("Broker ports:" + allBrokerPorts);
+ final int activeBrokerPort = getBrokerPortNumberFromConnection(activeConnection);
+ allBrokerPorts.remove(activeBrokerPort);
+ LOGGER.debug("Broker ports:" + allBrokerPorts);
+ final int inactiveBrokerPort = allBrokerPorts.iterator().next();
+ return inactiveBrokerPort;
+ }
+
+ public int getBdbPortForBrokerPort(final int brokerPortNumber)
+ {
+ return _brokerPortToBdbPortMap.get(brokerPortNumber);
+ }
+
+ public Set<Integer> getBdbPortNumbers()
+ {
+ return new HashSet<Integer>(_brokerPortToBdbPortMap.values());
+ }
+
+ public ConnectionURL getConnectionUrlForAllClusterNodes() throws Exception
+ {
+ return getConnectionUrlForAllClusterNodes(FAILOVER_CONNECTDELAY, FAILOVER_RETRIES, FAILOVER_CYCLECOUNT);
+ }
+
+ public ConnectionURL getConnectionUrlForAllClusterNodes(int connectDelay, int retries, final int cyclecount) throws Exception
+ {
+ final StringBuilder brokerList = new StringBuilder();
+
+ for(Iterator<Integer> itr = _brokerPortToBdbPortMap.keySet().iterator(); itr.hasNext(); )
+ {
+ int brokerPortNumber = itr.next();
+
+ brokerList.append(String.format(BROKER_PORTION_FORMAT, brokerPortNumber, connectDelay, retries));
+ if (itr.hasNext())
+ {
+ brokerList.append(";");
+ }
+ }
+
+ return new AMQConnectionURL(String.format(MANY_BROKER_URL_FORMAT, _virtualHostName, brokerList, cyclecount));
+ }
+
+ public AMQConnectionURL getConnectionUrlForSingleNodeWithoutRetry(final int brokerPortNumber) throws URLSyntaxException
+ {
+ return getConnectionUrlForSingleNode(brokerPortNumber, false);
+ }
+
+ public AMQConnectionURL getConnectionUrlForSingleNodeWithRetry(final int brokerPortNumber) throws URLSyntaxException
+ {
+ return getConnectionUrlForSingleNode(brokerPortNumber, true);
+ }
+
+ private AMQConnectionURL getConnectionUrlForSingleNode(final int brokerPortNumber, boolean retryAllowed) throws URLSyntaxException
+ {
+ final String url;
+ if (retryAllowed)
+ {
+ url = String.format(SINGLE_BROKER_URL_WITH_RETRY_FORMAT, _virtualHostName, brokerPortNumber, CONNECTDELAY, RETRIES);
+ }
+ else
+ {
+ url = String.format(SINGLE_BROKER_URL_WITHOUT_RETRY_FORMAT, _virtualHostName, brokerPortNumber);
+ }
+
+ return new AMQConnectionURL(url);
+ }
+
+ public String getGroupName()
+ {
+ return _groupName;
+ }
+
+ public String getNodeNameForNodeAt(final int bdbPort)
+ {
+ return "node" + _testcase.getName() + bdbPort;
+ }
+
+ public String getNodeHostPortForNodeAt(final int bdbPort)
+ {
+ return _ipAddressOfBroker + ":" + bdbPort;
+ }
+
+ public String getHelperHostPort()
+ {
+ if (_bdbHelperPort == 0)
+ {
+ throw new IllegalStateException("Helper port not yet assigned.");
+ }
+
+ return _ipAddressOfBroker + ":" + _bdbHelperPort;
+ }
+
+ public void setHelperHostPort(int bdbHelperPort)
+ {
+ _bdbHelperPort = bdbHelperPort;
+ }
+
+ public int getBrokerPortNumberOfPrimary()
+ {
+ if (_numberOfNodes != 2)
+ {
+ throw new IllegalArgumentException("Only two nodes groups have the concept of primary");
+ }
+
+ return _primaryBrokerPort;
+ }
+
+ public int getBrokerPortNumberOfSecondaryNode()
+ {
+ final Set<Integer> portNumbers = getBrokerPortNumbersForNodes();
+ portNumbers.remove(getBrokerPortNumberOfPrimary());
+ return portNumbers.iterator().next();
+ }
+
+ public Set<Integer> getBrokerPortNumbersForNodes()
+ {
+ return new HashSet<Integer>(_brokerPortToBdbPortMap.keySet());
+ }
+
+
+ public String getIpAddressOfBrokerHost()
+ {
+ String brokerHost = _testcase.getBroker().getHost();
+ try
+ {
+ return InetAddress.getByName(brokerHost).getHostAddress();
+ }
+ catch (UnknownHostException e)
+ {
+ throw new RuntimeException("Could not determine IP address of host : " + brokerHost, e);
+ }
+ }
+
+ public void modifyClusterNodeBdbAddress(int brokerPortNumberToBeMoved, int newBdbPort)
+ {
+ TestBrokerConfiguration config = _testcase.getBrokerConfiguration(brokerPortNumberToBeMoved);
+ String nodeName = getNodeNameForNodeAt(_brokerPortToBdbPortMap.get(brokerPortNumberToBeMoved));
+
+ Map<String, Object> objectAttributes = config.getObjectAttributes(VirtualHostNode.class, nodeName);
+
+ String oldBdbHostPort = (String)objectAttributes.get(BDBHAVirtualHostNode.ADDRESS);
+ String[] oldHostAndPort = StringUtils.split(oldBdbHostPort, ":");
+ String oldHost = oldHostAndPort[0];
+ String newBdbHostPort = oldHost + ":" + newBdbPort;
+ config.setObjectAttribute(VirtualHostNode.class, nodeName, BDBHAVirtualHostNode.ADDRESS, newBdbHostPort);
+ config.setSaved(false);
+ }
+
+ public String getNodeNameForBrokerPort(final int brokerPort)
+ {
+ return getNodeNameForNodeAt(_brokerPortToBdbPortMap.get(brokerPort));
+ }
+
+ public void setNodeAttributes(int brokerPort, Map<String, Object> attributeMap)
+ throws Exception
+ {
+ setNodeAttributes(brokerPort, brokerPort, attributeMap);
+ }
+
+ public void setNodeAttributes(int localNodePort, int remoteNodePort, Map<String, Object> attributeMap)
+ throws Exception
+ {
+ RestTestHelper restHelper = createRestTestHelper(localNodePort);
+ String url = getNodeRestUrl(localNodePort, remoteNodePort);
+ int status = restHelper.submitRequest(url, "PUT", attributeMap);
+ if (status != 200)
+ {
+ throw new Exception("Unexpected http status when updating " + getNodeNameForBrokerPort(remoteNodePort) + " attribute(s) : " + status);
+ }
+ }
+
+ private String getNodeRestUrl(int localNodePort, int remoteNodePort)
+ {
+ String remoteNodeName = getNodeNameForBrokerPort(remoteNodePort);
+ String localNodeName = getNodeNameForBrokerPort(localNodePort);
+ String url = null;
+ if (localNodePort == remoteNodePort)
+ {
+ url = "/api/latest/virtualhostnode/" + localNodeName;
+ }
+ else
+ {
+ url = "/api/latest/replicationnode/" + localNodeName + "/" + remoteNodeName;
+ }
+ return url;
+ }
+
+ public Map<String, Object> getNodeAttributes(int brokerPort) throws IOException
+ {
+ return getNodeAttributes(brokerPort, brokerPort);
+ }
+
+ public Map<String, Object> getNodeAttributes(int localNodePort, int remoteNodePort) throws IOException
+ {
+ RestTestHelper restHelper = createRestTestHelper(localNodePort);
+ List<Map<String, Object>> results= restHelper.getJsonAsList(getNodeRestUrl(localNodePort, remoteNodePort));
+ int size = results.size();
+ if (size == 0)
+ {
+ return Collections.emptyMap();
+ }
+ else if (size == 1)
+ {
+ return results.get(0);
+ }
+ else
+ {
+ throw new RuntimeException("Unexpected number of nodes " + size);
+ }
+ }
+
+ public void awaitNodeToAttainRole(int brokerPort, String desiredRole) throws Exception
+ {
+ awaitNodeToAttainRole(brokerPort, brokerPort, desiredRole);
+ }
+
+ public void awaitNodeToAttainRole(int localNodePort, int remoteNodePort, String desiredRole) throws Exception
+ {
+ final long startTime = System.currentTimeMillis();
+ Map<String, Object> data = Collections.emptyMap();
+
+ while(!desiredRole.equals(data.get(BDBHARemoteReplicationNode.ROLE)) && (System.currentTimeMillis() - startTime) < 30000)
+ {
+ LOGGER.debug("Awaiting node '" + getNodeNameForBrokerPort(remoteNodePort) + "' to transit into " + desiredRole + " role");
+ data = getNodeAttributes(localNodePort, remoteNodePort);
+ if (!desiredRole.equals(data.get(BDBHARemoteReplicationNode.ROLE)))
+ {
+ Thread.sleep(1000);
+ }
+ }
+ LOGGER.debug("Node '" + getNodeNameForBrokerPort(remoteNodePort) + "' role is " + data.get(BDBHARemoteReplicationNode.ROLE));
+ Assert.assertEquals("Node is in unexpected role", desiredRole, data.get(BDBHARemoteReplicationNode.ROLE));
+ }
+
+ public RestTestHelper createRestTestHelper(int brokerPort)
+ {
+ int httpPort = _testcase.getHttpManagementPort(brokerPort);
+ RestTestHelper helper = new RestTestHelper(httpPort);
+ helper.setUsernameAndPassword("webadmin", "webadmin");
+ return helper;
+ }
+
+ public static String getBlueprint(String hostName, int... ports) throws Exception
+ {
+ List<String> permittedNodes = new ArrayList<String>();
+ for (int port:ports)
+ {
+ permittedNodes.add(hostName + ":" + port);
+ }
+ Map<String,Object> bluePrint = new HashMap<>();
+ bluePrint.put(VirtualHost.TYPE, BDBHAVirtualHostImpl.VIRTUAL_HOST_TYPE);
+ bluePrint.put(BDBHAVirtualHost.PERMITTED_NODES, permittedNodes);
+
+ StringWriter writer = new StringWriter();
+ ObjectMapper mapper = new ObjectMapper();
+ mapper.configure(SerializationConfig.Feature.INDENT_OUTPUT, true);
+ mapper.writeValue(writer, bluePrint);
+ return writer.toString();
+ }
+}
diff --git a/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/JMXManagementTest.java b/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/JMXManagementTest.java
new file mode 100644
index 0000000000..c6f005c0e7
--- /dev/null
+++ b/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/JMXManagementTest.java
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.replication;
+
+import static com.sleepycat.je.rep.ReplicatedEnvironment.State.DETACHED;
+import static com.sleepycat.je.rep.ReplicatedEnvironment.State.MASTER;
+import static com.sleepycat.je.rep.ReplicatedEnvironment.State.REPLICA;
+import static com.sleepycat.je.rep.ReplicatedEnvironment.State.UNKNOWN;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import javax.jms.Connection;
+import javax.management.ObjectName;
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.TabularData;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.jms.ConnectionURL;
+import org.apache.qpid.management.common.mbeans.ManagedBroker;
+import org.apache.qpid.server.model.State;
+import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.store.berkeleydb.jmx.ManagedBDBHAMessageStore;
+import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHAVirtualHostNode;
+import org.apache.qpid.systest.rest.RestTestHelper;
+import org.apache.qpid.test.utils.JMXTestUtils;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.junit.Assert;
+
+/**
+ * System test verifying the ability to control a cluster via the Management API.
+ *
+ * @see MultiNodeTest
+ */
+public class JMXManagementTest extends QpidBrokerTestCase
+{
+ protected static final Logger LOGGER = Logger.getLogger(JMXManagementTest.class);
+
+ private static final Set<String> NON_MASTER_STATES = new HashSet<String>(Arrays.asList(REPLICA.toString(), DETACHED.toString(), UNKNOWN.toString()));;
+ private static final String VIRTUAL_HOST = "test";
+
+ private static final String MANAGED_OBJECT_QUERY = "org.apache.qpid:type=BDBHAMessageStore,name=" + ObjectName.quote(VIRTUAL_HOST);
+ private static final int NUMBER_OF_NODES = 4;
+
+ private final GroupCreator _clusterCreator = new GroupCreator(this, VIRTUAL_HOST, NUMBER_OF_NODES);
+ private final JMXTestUtils _jmxUtils = new JMXTestUtils(this);
+
+ private ConnectionURL _brokerFailoverUrl;
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ _brokerType = BrokerType.SPAWNED;
+
+ _clusterCreator.configureClusterNodes();
+ _brokerFailoverUrl = _clusterCreator.getConnectionUrlForAllClusterNodes();
+ _clusterCreator.startCluster();
+
+ super.setUp();
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ try
+ {
+ _jmxUtils.close();
+ }
+ finally
+ {
+ super.tearDown();
+ }
+ }
+
+ @Override
+ public void startBroker() throws Exception
+ {
+ // Don't start default broker provided by QBTC.
+ }
+
+ public void testReadonlyMBeanAttributes() throws Exception
+ {
+ final int brokerPortNumber = getBrokerPortNumbers().iterator().next();
+ final int bdbPortNumber = _clusterCreator.getBdbPortForBrokerPort(brokerPortNumber);
+
+ ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(brokerPortNumber);
+ assertEquals("Unexpected store group name", _clusterCreator.getGroupName(), storeBean.getGroupName());
+ assertEquals("Unexpected store node name", _clusterCreator.getNodeNameForNodeAt(bdbPortNumber), storeBean.getNodeName());
+ assertEquals("Unexpected store node host port",_clusterCreator.getNodeHostPortForNodeAt(bdbPortNumber), storeBean.getNodeHostPort());
+ assertEquals("Unexpected store helper host port", _clusterCreator.getHelperHostPort(), storeBean.getHelperHostPort());
+ // As we have chosen an arbitrary broker from the cluster, we cannot predict its state
+ assertNotNull("Store state must not be null", storeBean.getNodeState());
+ }
+
+ public void testStateOfActiveBrokerIsMaster() throws Exception
+ {
+ final Connection activeConnection = getConnection(_brokerFailoverUrl);
+ final int activeBrokerPortNumber = _clusterCreator.getBrokerPortNumberFromConnection(activeConnection);
+
+ ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(activeBrokerPortNumber);
+ assertEquals("Unexpected store state", MASTER.toString(), storeBean.getNodeState());
+ }
+
+ public void testStateOfNonActiveBrokerIsNotMaster() throws Exception
+ {
+ final Connection activeConnection = getConnection(_brokerFailoverUrl);
+ final int inactiveBrokerPortNumber = _clusterCreator.getPortNumberOfAnInactiveBroker(activeConnection);
+ ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(inactiveBrokerPortNumber);
+ final String nodeState = storeBean.getNodeState();
+ assertTrue("Unexpected store state : " + nodeState, NON_MASTER_STATES.contains(nodeState));
+ }
+
+ public void testGroupMembers() throws Exception
+ {
+ final int brokerPortNumber = getBrokerPortNumbers().iterator().next();
+
+ ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(brokerPortNumber);
+ awaitAllNodesJoiningGroup(storeBean, NUMBER_OF_NODES);
+
+ final TabularData groupMembers = storeBean.getAllNodesInGroup();
+ assertNotNull(groupMembers);
+
+ for(int bdbPortNumber : _clusterCreator.getBdbPortNumbers())
+ {
+ final String nodeName = _clusterCreator.getNodeNameForNodeAt(bdbPortNumber);
+ final String nodeHostPort = _clusterCreator.getNodeHostPortForNodeAt(bdbPortNumber);
+
+ CompositeData row = groupMembers.get(new Object[] {nodeName});
+ assertNotNull("Table does not contain row for node name " + nodeName, row);
+ assertEquals(nodeHostPort, row.get(ManagedBDBHAMessageStore.GRP_MEM_COL_NODE_HOST_PORT));
+ }
+ }
+
+ public void testRemoveRemoteNodeFromGroup() throws Exception
+ {
+ final Iterator<Integer> brokerPortNumberIterator = getBrokerPortNumbers().iterator();
+ final int brokerPortNumberToMakeObservation = brokerPortNumberIterator.next();
+ final int brokerPortNumberToBeRemoved = brokerPortNumberIterator.next();
+ final ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(brokerPortNumberToMakeObservation);
+ awaitAllNodesJoiningGroup(storeBean, NUMBER_OF_NODES);
+
+ final String removedNodeName = _clusterCreator.getNodeNameForNodeAt(_clusterCreator.getBdbPortForBrokerPort(brokerPortNumberToBeRemoved));
+ _clusterCreator.stopNode(brokerPortNumberToBeRemoved);
+
+ storeBean.removeNodeFromGroup(removedNodeName);
+
+ long limitTime = System.currentTimeMillis() + 5000;
+ while((NUMBER_OF_NODES == storeBean.getAllNodesInGroup().size()) && System.currentTimeMillis() < limitTime)
+ {
+ Thread.sleep(100l);
+ }
+
+ int numberOfDataRowsAfterRemoval = storeBean.getAllNodesInGroup().size();
+ assertEquals("Unexpected number of data rows after test", NUMBER_OF_NODES - 1, numberOfDataRowsAfterRemoval);
+ }
+
+ public void testVirtualHostOperationsDeniedForNonMasterNode() throws Exception
+ {
+ final Connection activeConnection = getConnection(_brokerFailoverUrl);
+ final int inactiveBrokerPortNumber = _clusterCreator.getPortNumberOfAnInactiveBroker(activeConnection);
+
+ ManagedBroker inactiveBroker = getManagedBrokerBeanForNodeAtBrokerPort(inactiveBrokerPortNumber);
+
+ try
+ {
+ inactiveBroker.createNewQueue(getTestQueueName(), null, true);
+ fail("Exception not thrown");
+ }
+ catch (Exception e)
+ {
+ String message = e.getMessage();
+ assertEquals("The virtual host state of UNAVAILABLE does not permit this operation.", message);
+ }
+
+ try
+ {
+ inactiveBroker.createNewExchange(getName(), "direct", true);
+ fail("Exception not thrown");
+ }
+ catch (Exception e)
+ {
+ String message = e.getMessage();
+ assertEquals("The virtual host state of UNAVAILABLE does not permit this operation.", message);
+ }
+ }
+
+ public void testSetDesignatedPrimary() throws Exception
+ {
+ int brokerPort = _clusterCreator.getBrokerPortNumbersForNodes().iterator().next();
+ final ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(brokerPort);
+ assertFalse("Unexpected designated primary before change", storeBean.getDesignatedPrimary());
+ storeBean.setDesignatedPrimary(true);
+ long limit = System.currentTimeMillis() + 5000;
+ while(!storeBean.getDesignatedPrimary() && System.currentTimeMillis() < limit)
+ {
+ Thread.sleep(100l);
+ }
+ assertTrue("Unexpected designated primary after change", storeBean.getDesignatedPrimary());
+ }
+
+ public void testVirtualHostMbeanOnMasterTransfer() throws Exception
+ {
+ Connection connection = getConnection(_brokerFailoverUrl);
+ int activeBrokerPort = _clusterCreator.getBrokerPortNumberFromConnection(connection);
+ LOGGER.info("Active connection port " + activeBrokerPort);
+ connection.close();
+
+ Set<Integer> ports = _clusterCreator.getBrokerPortNumbersForNodes();
+ ports.remove(activeBrokerPort);
+
+ int inactiveBrokerPort = ports.iterator().next();
+ LOGGER.info("Update role attribute on inactive broker on port " + inactiveBrokerPort);
+
+ ManagedBroker inactiveVirtualHostMBean = getManagedBrokerBeanForNodeAtBrokerPort(inactiveBrokerPort);
+
+ try
+ {
+ inactiveVirtualHostMBean.createNewQueue(getTestQueueName(), null, true);
+ fail("Exception not thrown");
+ }
+ catch (Exception e)
+ {
+ String message = e.getMessage();
+ assertEquals("The virtual host state of UNAVAILABLE does not permit this operation.", message);
+ }
+
+ Map<String, Object> attributes = _clusterCreator.getNodeAttributes(inactiveBrokerPort);
+ assertEquals("Inactive broker has unexpected role", "REPLICA", attributes.get(BDBHAVirtualHostNode.ROLE));
+ _clusterCreator.setNodeAttributes(inactiveBrokerPort, Collections.<String, Object>singletonMap(BDBHAVirtualHostNode.ROLE, "MASTER"));
+
+ _clusterCreator.awaitNodeToAttainRole(inactiveBrokerPort, "MASTER");
+
+ awaitVirtualHostAtNode(inactiveBrokerPort);
+
+ ManagedBroker activeVirtualHostMBean = getManagedBrokerBeanForNodeAtBrokerPort(inactiveBrokerPort);
+ activeVirtualHostMBean.createNewQueue(getTestQueueName() + inactiveBrokerPort, null, true);
+ }
+
+ public void awaitVirtualHostAtNode(int brokerPort) throws Exception
+ {
+ final long startTime = System.currentTimeMillis();
+ Map<String, Object> data = Collections.emptyMap();
+ String nodeName = _clusterCreator.getNodeNameForBrokerPort(brokerPort);
+ RestTestHelper restHelper = _clusterCreator.createRestTestHelper(brokerPort);
+ while(!State.ACTIVE.name().equals(data.get(VirtualHost.STATE)) && (System.currentTimeMillis() - startTime) < 30000)
+ {
+ LOGGER.debug("Awaiting virtual host '" + nodeName + "' to transit into active state");
+ List<Map<String, Object>> results= restHelper.getJsonAsList("virtualhost/" + nodeName + "/" + VIRTUAL_HOST);
+ if (results.size()== 1)
+ {
+ data = results.get(0);
+ }
+
+ if (!State.ACTIVE.name().equals(data.get(VirtualHost.STATE)))
+ {
+ Thread.sleep(1000);
+ }
+ }
+ Assert.assertEquals("Virtual host is not active", State.ACTIVE.name(), data.get(VirtualHost.STATE));
+ LOGGER.debug("Virtual host '" + nodeName + "' is in active state");
+ }
+
+ private ManagedBDBHAMessageStore getStoreBeanForNodeAtBrokerPort(final int brokerPortNumber) throws Exception
+ {
+ _jmxUtils.open(brokerPortNumber);
+
+ return _jmxUtils.getManagedObject(ManagedBDBHAMessageStore.class, MANAGED_OBJECT_QUERY);
+ }
+
+ private ManagedBroker getManagedBrokerBeanForNodeAtBrokerPort(final int brokerPortNumber) throws Exception
+ {
+ _jmxUtils.open(brokerPortNumber);
+
+ return _jmxUtils.getManagedBroker(VIRTUAL_HOST);
+ }
+
+ private void awaitAllNodesJoiningGroup(ManagedBDBHAMessageStore storeBean, int expectedNumberOfNodes) throws Exception
+ {
+ long totalTimeWaited = 0l;
+ long waitInterval = 100l;
+ long maxWaitTime = 10000;
+
+ int currentNumberOfNodes = storeBean.getAllNodesInGroup().size();
+ while (expectedNumberOfNodes > currentNumberOfNodes || totalTimeWaited > maxWaitTime)
+ {
+ LOGGER.debug("Still awaiting nodes to join group; expecting "
+ + expectedNumberOfNodes + " node(s) but only have " + currentNumberOfNodes
+ + " after " + totalTimeWaited + " ms.");
+
+ totalTimeWaited += waitInterval;
+ Thread.sleep(waitInterval);
+
+ currentNumberOfNodes = storeBean.getAllNodesInGroup().size();
+ }
+
+ assertEquals("Unexpected number of nodes in group after " + totalTimeWaited + " ms",
+ expectedNumberOfNodes ,currentNumberOfNodes);
+ }
+}
diff --git a/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/MultiNodeTest.java b/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/MultiNodeTest.java
new file mode 100644
index 0000000000..d6ba419de1
--- /dev/null
+++ b/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/MultiNodeTest.java
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.replication;
+
+import java.io.File;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.jms.ConnectionListener;
+import org.apache.qpid.jms.ConnectionURL;
+import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHAVirtualHostNode;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.apache.qpid.test.utils.TestUtils;
+
+/**
+ * The HA black box tests test the BDB cluster as a opaque unit. Client connects to
+ * the cluster via a failover url
+ */
+public class MultiNodeTest extends QpidBrokerTestCase
+{
+ protected static final Logger LOGGER = Logger.getLogger(MultiNodeTest.class);
+
+ private static final String VIRTUAL_HOST = "test";
+ private static final int NUMBER_OF_NODES = 3;
+
+ private final GroupCreator _groupCreator = new GroupCreator(this, VIRTUAL_HOST, NUMBER_OF_NODES);
+
+ private FailoverAwaitingListener _failoverListener;
+
+ /** Used when expectation is client will (re)-connect */
+ private ConnectionURL _positiveFailoverUrl;
+
+ /** Used when expectation is client will not (re)-connect */
+ private ConnectionURL _negativeFailoverUrl;
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ _brokerType = BrokerType.SPAWNED;
+
+ assertTrue(isJavaBroker());
+ assertTrue(isBrokerStorePersistent());
+
+ setSystemProperty("java.util.logging.config.file", "etc" + File.separator + "log.properties");
+
+ _groupCreator.configureClusterNodes();
+
+ _positiveFailoverUrl = _groupCreator.getConnectionUrlForAllClusterNodes();
+ _negativeFailoverUrl = _groupCreator.getConnectionUrlForAllClusterNodes(200, 0, 2);
+
+ _groupCreator.startCluster();
+ _failoverListener = new FailoverAwaitingListener();
+
+ super.setUp();
+ }
+
+ @Override
+ public void startBroker() throws Exception
+ {
+ // Don't start default broker provided by QBTC.
+ }
+
+ public void testLossOfMasterNodeCausesClientToFailover() throws Exception
+ {
+ final Connection connection = getConnection(_positiveFailoverUrl);
+
+ ((AMQConnection)connection).setConnectionListener(_failoverListener);
+
+ final int activeBrokerPort = _groupCreator.getBrokerPortNumberFromConnection(connection);
+ LOGGER.info("Active connection port " + activeBrokerPort);
+
+ _groupCreator.stopNode(activeBrokerPort);
+ LOGGER.info("Node is stopped");
+ _failoverListener.awaitFailoverCompletion(20000);
+ LOGGER.info("Listener has finished");
+ // any op to ensure connection remains
+ connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ }
+
+ public void testLossOfReplicaNodeDoesNotCauseClientToFailover() throws Exception
+ {
+ final Connection connection = getConnection(_positiveFailoverUrl);
+
+ ((AMQConnection)connection).setConnectionListener(_failoverListener);
+ final int activeBrokerPort = _groupCreator.getBrokerPortNumberFromConnection(connection);
+ LOGGER.info("Active connection port " + activeBrokerPort);
+ final int inactiveBrokerPort = _groupCreator.getPortNumberOfAnInactiveBroker(connection);
+
+ LOGGER.info("Stopping inactive broker on port " + inactiveBrokerPort);
+
+ _groupCreator.stopNode(inactiveBrokerPort);
+
+ _failoverListener.assertNoFailoverCompletionWithin(2000);
+
+ assertProducingConsuming(connection);
+ }
+
+ public void testLossOfQuorumCausesClientDisconnection() throws Exception
+ {
+ final Connection connection = getConnection(_negativeFailoverUrl);
+
+ ((AMQConnection)connection).setConnectionListener(_failoverListener);
+
+ Set<Integer> ports = _groupCreator.getBrokerPortNumbersForNodes();
+
+ final int activeBrokerPort = _groupCreator.getBrokerPortNumberFromConnection(connection);
+ ports.remove(activeBrokerPort);
+
+ // Stop all other nodes
+ for (Integer p : ports)
+ {
+ _groupCreator.stopNode(p);
+ }
+
+ try
+ {
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ Destination destination = session.createQueue(getTestQueueName());
+ session.createConsumer(destination).close();
+ fail("Exception not thrown - creating durable queue should fail without quorum");
+ }
+ catch(JMSException jms)
+ {
+ // PASS
+ }
+
+ // New connections should now fail as vhost will be unavailable
+ try
+ {
+ getConnection(_negativeFailoverUrl);
+ fail("Exception not thrown");
+ }
+ catch (JMSException je)
+ {
+ // PASS
+ }
+ }
+
+ public void testPersistentMessagesAvailableAfterFailover() throws Exception
+ {
+ final Connection connection = getConnection(_positiveFailoverUrl);
+
+ ((AMQConnection)connection).setConnectionListener(_failoverListener);
+
+ final int activeBrokerPort = _groupCreator.getBrokerPortNumberFromConnection(connection);
+
+ Session producingSession = connection.createSession(true, Session.SESSION_TRANSACTED);
+ Destination queue = producingSession.createQueue(getTestQueueName());
+ producingSession.createConsumer(queue).close();
+ sendMessage(producingSession, queue, 10);
+
+ _groupCreator.stopNode(activeBrokerPort);
+ LOGGER.info("Old master (broker port " + activeBrokerPort + ") is stopped");
+
+ _failoverListener.awaitFailoverCompletion(20000);
+ LOGGER.info("Failover has finished");
+
+ final int activeBrokerPortAfterFailover = _groupCreator.getBrokerPortNumberFromConnection(connection);
+ LOGGER.info("New master (broker port " + activeBrokerPort + ") after failover");
+
+ Session consumingSession = connection.createSession(true, Session.SESSION_TRANSACTED);
+ MessageConsumer consumer = consumingSession.createConsumer(queue);
+
+ connection.start();
+ for(int i = 0; i < 10; i++)
+ {
+ Message m = consumer.receive(RECEIVE_TIMEOUT);
+ assertNotNull("Message " + i + " is not received", m);
+ assertEquals("Unexpected message received", i, m.getIntProperty(INDEX));
+ }
+ consumingSession.commit();
+ }
+
+ public void testTransferMasterFromLocalNode() throws Exception
+ {
+ final Connection connection = getConnection(_positiveFailoverUrl);
+
+ ((AMQConnection)connection).setConnectionListener(_failoverListener);
+
+ final int activeBrokerPort = _groupCreator.getBrokerPortNumberFromConnection(connection);
+ LOGGER.info("Active connection port " + activeBrokerPort);
+
+ final int inactiveBrokerPort = _groupCreator.getPortNumberOfAnInactiveBroker(connection);
+ LOGGER.info("Update role attribute on inactive broker on port " + inactiveBrokerPort);
+
+ Map<String, Object> attributes = _groupCreator.getNodeAttributes(inactiveBrokerPort);
+ assertEquals("Inactive broker has unexpected role", "REPLICA", attributes.get(BDBHAVirtualHostNode.ROLE));
+ _groupCreator.setNodeAttributes(inactiveBrokerPort,
+ Collections.<String, Object>singletonMap(BDBHAVirtualHostNode.ROLE, "MASTER"));
+
+ _failoverListener.awaitFailoverCompletion(20000);
+ LOGGER.info("Listener has finished");
+
+ attributes = _groupCreator.getNodeAttributes(inactiveBrokerPort);
+ assertEquals("Inactive broker has unexpected role", "MASTER", attributes.get(BDBHAVirtualHostNode.ROLE));
+
+ assertProducingConsuming(connection);
+
+ _groupCreator.awaitNodeToAttainRole(activeBrokerPort, "REPLICA");
+ }
+
+ public void testTransferMasterFromRemoteNode() throws Exception
+ {
+ final Connection connection = getConnection(_positiveFailoverUrl);
+
+ ((AMQConnection)connection).setConnectionListener(_failoverListener);
+
+ final int activeBrokerPort = _groupCreator.getBrokerPortNumberFromConnection(connection);
+ LOGGER.info("Active connection port " + activeBrokerPort);
+
+ final int inactiveBrokerPort = _groupCreator.getPortNumberOfAnInactiveBroker(connection);
+ LOGGER.info("Update role attribute on inactive broker on port " + inactiveBrokerPort);
+
+ _groupCreator.awaitNodeToAttainRole(activeBrokerPort, inactiveBrokerPort, "REPLICA");
+ Map<String, Object> attributes = _groupCreator.getNodeAttributes(activeBrokerPort, inactiveBrokerPort);
+ assertEquals("Inactive broker has unexpected role", "REPLICA", attributes.get(BDBHAVirtualHostNode.ROLE));
+
+ _groupCreator.setNodeAttributes(activeBrokerPort, inactiveBrokerPort, Collections.<String, Object>singletonMap(BDBHAVirtualHostNode.ROLE, "MASTER"));
+
+ _failoverListener.awaitFailoverCompletion(20000);
+ LOGGER.info("Listener has finished");
+
+ attributes = _groupCreator.getNodeAttributes(inactiveBrokerPort);
+ assertEquals("Inactive broker has unexpected role", "MASTER", attributes.get(BDBHAVirtualHostNode.ROLE));
+
+ assertProducingConsuming(connection);
+
+ _groupCreator.awaitNodeToAttainRole(activeBrokerPort, "REPLICA");
+ }
+
+ public void testQuorumOverride() throws Exception
+ {
+ final Connection connection = getConnection(_positiveFailoverUrl);
+
+ Set<Integer> ports = _groupCreator.getBrokerPortNumbersForNodes();
+
+ final int activeBrokerPort = _groupCreator.getBrokerPortNumberFromConnection(connection);
+ ports.remove(activeBrokerPort);
+
+ // Stop all other nodes
+ for (Integer p : ports)
+ {
+ _groupCreator.stopNode(p);
+ }
+
+ Map<String, Object> attributes = _groupCreator.getNodeAttributes(activeBrokerPort);
+ assertEquals("Broker has unexpected quorum override", new Integer(0), attributes.get(BDBHAVirtualHostNode.QUORUM_OVERRIDE));
+ _groupCreator.setNodeAttributes(activeBrokerPort, Collections.<String, Object>singletonMap(BDBHAVirtualHostNode.QUORUM_OVERRIDE, 1));
+
+ attributes = _groupCreator.getNodeAttributes(activeBrokerPort);
+ assertEquals("Broker has unexpected quorum override", new Integer(1), attributes.get(BDBHAVirtualHostNode.QUORUM_OVERRIDE));
+
+ assertProducingConsuming(connection);
+ }
+
+ public void testPriority() throws Exception
+ {
+ final Connection connection = getConnection(_positiveFailoverUrl);
+
+ ((AMQConnection)connection).setConnectionListener(_failoverListener);
+
+ final int activeBrokerPort = _groupCreator.getBrokerPortNumberFromConnection(connection);
+ LOGGER.info("Active connection port " + activeBrokerPort);
+
+ int priority = 1;
+ Integer highestPriorityBrokerPort = null;
+ Set<Integer> ports = _groupCreator.getBrokerPortNumbersForNodes();
+ for (Integer port : ports)
+ {
+ if (activeBrokerPort != port.intValue())
+ {
+ priority = priority + 1;
+ highestPriorityBrokerPort = port;
+ _groupCreator.setNodeAttributes(port, port, Collections.<String, Object>singletonMap(BDBHAVirtualHostNode.PRIORITY, priority));
+ Map<String, Object> attributes = _groupCreator.getNodeAttributes(port, port);
+ assertEquals("Broker has unexpected priority", priority, attributes.get(BDBHAVirtualHostNode.PRIORITY));
+ }
+ }
+
+ LOGGER.info("Broker on port " + highestPriorityBrokerPort + " has the highest priority of " + priority);
+
+ LOGGER.info("Shutting down the MASTER");
+ _groupCreator.stopNode(activeBrokerPort);
+
+ _failoverListener.awaitFailoverCompletion(20000);
+ LOGGER.info("Listener has finished");
+
+ Map<String, Object> attributes = _groupCreator.getNodeAttributes(highestPriorityBrokerPort, highestPriorityBrokerPort);
+ assertEquals("Inactive broker has unexpected role", "MASTER", attributes.get(BDBHAVirtualHostNode.ROLE));
+
+ assertProducingConsuming(connection);
+ }
+
+ private final class FailoverAwaitingListener implements ConnectionListener
+ {
+ private final CountDownLatch _failoverCompletionLatch = new CountDownLatch(1);
+
+ @Override
+ public boolean preResubscribe()
+ {
+ return true;
+ }
+
+ @Override
+ public boolean preFailover(boolean redirect)
+ {
+ return true;
+ }
+
+ public void awaitFailoverCompletion(long delay) throws InterruptedException
+ {
+ if (!_failoverCompletionLatch.await(delay, TimeUnit.MILLISECONDS))
+ {
+ LOGGER.warn("Test thread dump:\n\n" + TestUtils.dumpThreads() + "\n");
+ }
+ assertEquals("Failover did not occur", 0, _failoverCompletionLatch.getCount());
+ }
+
+ public void assertNoFailoverCompletionWithin(long delay) throws InterruptedException
+ {
+ _failoverCompletionLatch.await(delay, TimeUnit.MILLISECONDS);
+ assertEquals("Failover occurred unexpectedly", 1L, _failoverCompletionLatch.getCount());
+ }
+
+ @Override
+ public void failoverComplete()
+ {
+ _failoverCompletionLatch.countDown();
+ }
+
+ @Override
+ public void bytesSent(long count)
+ {
+ }
+
+ @Override
+ public void bytesReceived(long count)
+ {
+ }
+ }
+
+}
diff --git a/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/TwoNodeTest.java b/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/TwoNodeTest.java
new file mode 100644
index 0000000000..0f8a1609de
--- /dev/null
+++ b/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/TwoNodeTest.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.replication;
+
+import java.io.File;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.management.ObjectName;
+
+import org.apache.qpid.jms.ConnectionURL;
+import org.apache.qpid.server.store.berkeleydb.jmx.ManagedBDBHAMessageStore;
+import org.apache.qpid.test.utils.JMXTestUtils;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+public class TwoNodeTest extends QpidBrokerTestCase
+{
+ private static final String VIRTUAL_HOST = "test";
+
+ private static final String MANAGED_OBJECT_QUERY = "org.apache.qpid:type=BDBHAMessageStore,name=" + ObjectName.quote(VIRTUAL_HOST);
+ private static final int NUMBER_OF_NODES = 2;
+
+ private final GroupCreator _groupCreator = new GroupCreator(this, VIRTUAL_HOST, NUMBER_OF_NODES);
+ private final JMXTestUtils _jmxUtils = new JMXTestUtils(this);
+
+ private ConnectionURL _brokerFailoverUrl;
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ _brokerType = BrokerType.SPAWNED;
+
+ assertTrue(isJavaBroker());
+ assertTrue(isBrokerStorePersistent());
+
+ super.setUp();
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ try
+ {
+ _jmxUtils.close();
+ }
+ finally
+ {
+ super.tearDown();
+ }
+ }
+
+ @Override
+ public void startBroker() throws Exception
+ {
+ // Don't start default broker provided by QBTC.
+ }
+
+ private void startCluster(boolean designedPrimary) throws Exception
+ {
+ setSystemProperty("java.util.logging.config.file", "etc" + File.separator + "log.properties");
+ _groupCreator.configureClusterNodes();
+ _groupCreator.setDesignatedPrimaryOnFirstBroker(designedPrimary);
+ _brokerFailoverUrl = _groupCreator.getConnectionUrlForAllClusterNodes();
+ _groupCreator.startCluster();
+ }
+
+ public void testMasterDesignatedPrimaryCanBeRestartedWithoutReplica() throws Exception
+ {
+ startCluster(true);
+ final Connection initialConnection = getConnection(_brokerFailoverUrl);
+ int masterPort = _groupCreator.getBrokerPortNumberFromConnection(initialConnection);
+ assertProducingConsuming(initialConnection);
+ initialConnection.close();
+ _groupCreator.stopCluster();
+ _groupCreator.startNode(masterPort);
+ final Connection secondConnection = getConnection(_brokerFailoverUrl);
+ assertProducingConsuming(secondConnection);
+ secondConnection.close();
+ }
+
+ public void testClusterRestartWithoutDesignatedPrimary() throws Exception
+ {
+ startCluster(false);
+ final Connection initialConnection = getConnection(_brokerFailoverUrl);
+ assertProducingConsuming(initialConnection);
+ initialConnection.close();
+ _groupCreator.stopCluster();
+ _groupCreator.startClusterParallel();
+ final Connection secondConnection = getConnection(_brokerFailoverUrl);
+ assertProducingConsuming(secondConnection);
+ secondConnection.close();
+ }
+
+ public void testDesignatedPrimaryContinuesAfterSecondaryStopped() throws Exception
+ {
+ startCluster(true);
+ _groupCreator.stopNode(_groupCreator.getBrokerPortNumberOfSecondaryNode());
+ final Connection connection = getConnection(_brokerFailoverUrl);
+ assertNotNull("Expected to get a valid connection to primary", connection);
+ assertProducingConsuming(connection);
+ }
+
+ public void testPersistentOperationsFailOnNonDesignatedPrimaryAfterSecondaryStopped() throws Exception
+ {
+ startCluster(false);
+ _groupCreator.stopNode(_groupCreator.getBrokerPortNumberOfSecondaryNode());
+
+ try
+ {
+ Connection connection = getConnection(_brokerFailoverUrl);
+ assertProducingConsuming(connection);
+ fail("Exception not thrown");
+ }
+ catch(JMSException e)
+ {
+ // JMSException should be thrown either on getConnection, or produce/consume
+ // depending on whether the relative timing of the node discovering that the
+ // secondary has gone.
+ }
+ }
+
+ public void testSecondaryDoesNotBecomePrimaryWhenDesignatedPrimaryStopped() throws Exception
+ {
+ startCluster(true);
+ _groupCreator.stopNode(_groupCreator.getBrokerPortNumberOfPrimary());
+
+ try
+ {
+ getConnection(_brokerFailoverUrl);
+ fail("Connection not expected");
+ }
+ catch (JMSException e)
+ {
+ // PASS
+ }
+ }
+
+ public void testInitialDesignatedPrimaryStateOfNodes() throws Exception
+ {
+ startCluster(true);
+ final ManagedBDBHAMessageStore primaryStoreBean = getStoreBeanForNodeAtBrokerPort(_groupCreator.getBrokerPortNumberOfPrimary());
+ assertTrue("Expected primary node to be set as designated primary", primaryStoreBean.getDesignatedPrimary());
+
+ final ManagedBDBHAMessageStore secondaryStoreBean = getStoreBeanForNodeAtBrokerPort(_groupCreator.getBrokerPortNumberOfSecondaryNode());
+ assertFalse("Expected secondary node to NOT be set as designated primary", secondaryStoreBean.getDesignatedPrimary());
+ }
+
+ public void testSecondaryDesignatedAsPrimaryAfterOriginalPrimaryStopped() throws Exception
+ {
+ startCluster(true);
+ final ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(_groupCreator.getBrokerPortNumberOfSecondaryNode());
+ _groupCreator.stopNode(_groupCreator.getBrokerPortNumberOfPrimary());
+
+ assertFalse("Expected node to NOT be set as designated primary", storeBean.getDesignatedPrimary());
+ storeBean.setDesignatedPrimary(true);
+
+ long limit = System.currentTimeMillis() + 5000;
+ while( !storeBean.getDesignatedPrimary() && System.currentTimeMillis() < limit)
+ {
+ Thread.sleep(100);
+ }
+ assertTrue("Expected node to now be set as designated primary", storeBean.getDesignatedPrimary());
+
+ final Connection connection = getConnection(_brokerFailoverUrl);
+ assertNotNull("Expected to get a valid connection to new primary", connection);
+ assertProducingConsuming(connection);
+ }
+
+ private ManagedBDBHAMessageStore getStoreBeanForNodeAtBrokerPort(
+ final int activeBrokerPortNumber) throws Exception
+ {
+ _jmxUtils.open(activeBrokerPortNumber);
+
+ ManagedBDBHAMessageStore storeBean = _jmxUtils.getManagedObject(ManagedBDBHAMessageStore.class, MANAGED_OBJECT_QUERY);
+ return storeBean;
+ }
+
+}