summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2014-02-05 15:40:50 +0000
committerKeith Wall <kwall@apache.org>2014-02-05 15:40:50 +0000
commit4d7e126fd91bb6024840cb716e89f358cf8b755b (patch)
tree36b9a314f311e05d4efeb55d7754c9065b39e636
parenta2ff3ed02e5c94c7b669a1be79505d831c017a19 (diff)
downloadqpid-python-4d7e126fd91bb6024840cb716e89f358cf8b755b.tar.gz
QPID-5409: Add functionality into RemoteReplicationNode to change role attribute from REPLICA to MASTER
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-bdb-ha@1564813 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNode.java99
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java11
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNodeTest.java133
-rw-r--r--qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java29
-rw-r--r--qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterTwoNodeTest.java15
-rw-r--r--qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java35
6 files changed, 299 insertions, 23 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNode.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNode.java
index 06fffddc0a..594dd8978f 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNode.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNode.java
@@ -21,11 +21,18 @@
package org.apache.qpid.server.store.berkeleydb.replication;
import java.io.IOException;
+import java.lang.reflect.Type;
import java.security.AccessControlException;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;
+import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.configuration.updater.TaskExecutor;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.LifetimePolicy;
@@ -36,9 +43,12 @@ import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.model.adapter.AbstractAdapter;
import org.apache.qpid.server.model.adapter.NoStatistics;
+import org.apache.qpid.server.util.MapValueConverter;
import com.sleepycat.je.rep.NodeState;
+import com.sleepycat.je.rep.ReplicatedEnvironment;
import com.sleepycat.je.rep.util.DbPing;
+import com.sleepycat.je.rep.util.ReplicationGroupAdmin;
import com.sleepycat.je.rep.utilint.ServiceDispatcher.ServiceConnectFailedException;
/**
@@ -48,23 +58,32 @@ public class RemoteReplicationNode extends AbstractAdapter implements Replicatio
{
private static final Logger LOGGER = Logger.getLogger(RemoteReplicationNode.class);
+ @SuppressWarnings("serial")
+ private static final Map<String, Type> ATTRIBUTE_TYPES = new HashMap<String, Type>()
+ {{
+ put(ROLE, String.class);
+ }};
+
private final com.sleepycat.je.rep.ReplicationNode _replicationNode;
- private final VirtualHost _virtualHost;
private final String _hostPort;
private final String _groupName;
+ private final DbPing _dbPing;
+ private final ReplicationGroupAdmin _replicationGroupAdmin;
private volatile String _role;
private volatile long _joinTime;
private volatile long _lastTransactionId;
- public RemoteReplicationNode(com.sleepycat.je.rep.ReplicationNode replicationNode, String groupName, VirtualHost virtualHost, TaskExecutor taskExecutor)
+ public RemoteReplicationNode(com.sleepycat.je.rep.ReplicationNode replicationNode, String groupName, VirtualHost virtualHost,
+ TaskExecutor taskExecutor, DbPing dbPing, ReplicationGroupAdmin admin)
{
super(UUIDGenerator.generateReplicationNodeId(groupName, replicationNode.getName()), null, null, taskExecutor);
addParent(VirtualHost.class, virtualHost);
_groupName = groupName;
_hostPort = replicationNode.getHostName() + ":" + replicationNode.getPort();
_replicationNode = replicationNode;
- _virtualHost = virtualHost;
+ _dbPing = dbPing;
+ _replicationGroupAdmin = admin;
}
@Override
@@ -154,7 +173,6 @@ public class RemoteReplicationNode extends AbstractAdapter implements Replicatio
}
}
-
@Override
public Object getAttribute(String name)
{
@@ -207,15 +225,13 @@ public class RemoteReplicationNode extends AbstractAdapter implements Replicatio
public void updateNodeState()
{
- Long monitorTimeout = (Long)_virtualHost.getAttribute(VirtualHost.REMOTE_REPLICATION_NODE_MONITOR_TIMEOUT);
- DbPing ping = new DbPing(_replicationNode, _groupName, monitorTimeout.intValue());
String oldRole = _role;
long oldJoinTime = _joinTime;
long oldTransactionId = _lastTransactionId;
try
{
- NodeState state = ping.getNodeState();
+ NodeState state = _dbPing.getNodeState();
_role = state.getNodeState().name();
_joinTime = state.getJoinTime();
_lastTransactionId = state.getCurrentTxnEndVLSN();
@@ -252,4 +268,73 @@ public class RemoteReplicationNode extends AbstractAdapter implements Replicatio
{
return ReplicationNode.AVAILABLE_ATTRIBUTES;
}
+
+ @Override
+ public void changeAttributes(Map<String, Object> attributes)
+ throws IllegalStateException, AccessControlException,
+ IllegalArgumentException
+ {
+ checkWhetherImmutableAttributeChanged(attributes);
+ Map<String, Object> convertedAttributes = MapValueConverter.convert(attributes, ATTRIBUTE_TYPES);
+
+ if (convertedAttributes.containsKey(ROLE))
+ {
+ String currentRole = (String)getAttribute(ROLE);
+ if (!ReplicatedEnvironment.State.REPLICA.name().equals(currentRole))
+ {
+ throw new IllegalConfigurationException("Cannot transfer mastership when not a replica");
+ }
+
+ String role = (String)convertedAttributes.get(ROLE);
+
+ if (ReplicatedEnvironment.State.MASTER.name().equals(role) )
+ {
+ try
+ {
+ String nodeName = getName();
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Trying to transfer master to " + nodeName);
+ }
+
+ _replicationGroupAdmin.transferMaster(Collections.singleton(nodeName), ReplicatedEnvironmentFacade.MASTER_TRANSFER_TIMEOUT, TimeUnit.MILLISECONDS, true);
+
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("The mastership has been transfered to " + nodeName);
+ }
+ }
+ catch(Exception e)
+ {
+ throw new IllegalConfigurationException("Cannot transfer mastership to " + getName(), e);
+ }
+ }
+ else
+ {
+ throw new IllegalConfigurationException("Changing role to other value then "
+ + ReplicatedEnvironment.State.MASTER.name() + " is unsupported");
+ }
+ }
+
+ super.changeAttributes(convertedAttributes);
+ }
+
+ private void checkWhetherImmutableAttributeChanged(Map<String, Object> attributes)
+ {
+ Set<String> immutableAttributeNames = new HashSet<String>(getAttributeNames());
+ immutableAttributeNames.remove(ROLE);
+ for (String attributeName : immutableAttributeNames)
+ {
+ if (attributes.containsKey(attributeName))
+ {
+ // the name is appended into attributes map in REST layer
+ if (attributeName.equals(NAME) && getName().equals(attributes.get(NAME)))
+ {
+ continue;
+ }
+ throw new IllegalConfigurationException("Cannot change value of immutable attribute " + attributeName);
+ }
+ }
+ }
+
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java
index 26651ac64c..afbe1e2187 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java
@@ -21,6 +21,7 @@
package org.apache.qpid.server.store.berkeleydb.replication;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@@ -32,6 +33,8 @@ import org.apache.qpid.server.store.berkeleydb.EnvironmentFacadeFactory;
import com.sleepycat.je.Durability;
import com.sleepycat.je.Durability.SyncPolicy;
+import com.sleepycat.je.rep.util.DbPing;
+import com.sleepycat.je.rep.util.ReplicationGroupAdmin;
//TODO: Should LocalReplicationNode implement EnvironmentFacadeFactory instead of having this class?
public class ReplicatedEnvironmentFacadeFactory implements EnvironmentFacadeFactory
@@ -82,7 +85,13 @@ public class ReplicatedEnvironmentFacadeFactory implements EnvironmentFacadeFact
attributes.put(ReplicationNode.NAME, replicationNode.getName());
attributes.put(ReplicationNode.GROUP_NAME, groupName);
attributes.put(ReplicationNode.HOST_PORT, replicationNode.getHostName() + ":" + replicationNode.getPort());
- return new RemoteReplicationNode(replicationNode, groupName, _virtualHost, _virtualHost.getTaskExecutor());
+
+ Long monitorTimeout = (Long)_virtualHost.getAttribute(VirtualHost.REMOTE_REPLICATION_NODE_MONITOR_TIMEOUT);
+ DbPing dbPing = new DbPing(replicationNode, groupName, monitorTimeout.intValue());
+
+ ReplicationGroupAdmin replicationGroupAdmin = new ReplicationGroupAdmin(groupName, Collections.singleton(replicationNode.getSocketAddress()));
+
+ return new RemoteReplicationNode(replicationNode, groupName, _virtualHost, _virtualHost.getTaskExecutor(), dbPing, replicationGroupAdmin);
}
@Override
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNodeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNodeTest.java
new file mode 100644
index 0000000000..f65fecd201
--- /dev/null
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNodeTest.java
@@ -0,0 +1,133 @@
+package org.apache.qpid.server.store.berkeleydb.replication;
+
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.verify;
+import static org.apache.qpid.server.model.ReplicationNode.*;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.qpid.server.configuration.IllegalConfigurationException;
+import org.apache.qpid.server.configuration.updater.TaskExecutor;
+import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.test.utils.QpidTestCase;
+
+import com.sleepycat.je.rep.NodeState;
+import com.sleepycat.je.rep.ReplicatedEnvironment.State;
+import com.sleepycat.je.rep.ReplicationNode;
+import com.sleepycat.je.rep.util.DbPing;
+import com.sleepycat.je.rep.util.ReplicationGroupAdmin;
+
+public class RemoteReplicationNodeTest extends QpidTestCase
+{
+
+ private RemoteReplicationNode _node;
+ private String _groupName;
+ private VirtualHost _virtualHost;
+ private TaskExecutor _taskExecutor;
+ private ReplicationNode _replicationNode;
+ private String _nodeName;
+ private int _port;
+ private DbPing _dbPing;
+ private ReplicationGroupAdmin _remoteReplicationAdmin;
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ _groupName = getTestName();
+ _nodeName = getTestName() + "Name";
+ _port = 5000;
+ _replicationNode = mock(ReplicationNode.class);
+ _virtualHost = mock(VirtualHost.class);
+ _taskExecutor = mock(TaskExecutor.class);
+ _dbPing = mock(DbPing.class);
+ _remoteReplicationAdmin = mock(ReplicationGroupAdmin.class);
+
+ when(_taskExecutor.isTaskExecutorThread()).thenReturn(true);
+ when(_replicationNode.getName()).thenReturn(_nodeName);
+ when(_replicationNode.getHostName()).thenReturn("localhost");
+ when(_replicationNode.getPort()).thenReturn(_port);
+
+ _node = new RemoteReplicationNode(_replicationNode, _groupName, _virtualHost, _taskExecutor, _dbPing, _remoteReplicationAdmin);
+ }
+
+ public void testGetAttribute() throws Exception
+ {
+ State state = State.MASTER;
+ long joinTime = System.currentTimeMillis();
+ long currentTxnEndVLSN = 3;
+
+ updateNodeState(state, joinTime, currentTxnEndVLSN);
+
+ assertEquals("Unexpected name", _nodeName, _node.getAttribute(NAME));
+ assertEquals("Unexpected group name", _groupName, _node.getAttribute(GROUP_NAME));
+ assertEquals("Unexpected state", state.name(), _node.getAttribute(ROLE));
+ assertEquals("Unexpected transaction id", currentTxnEndVLSN, _node.getAttribute(LAST_KNOWN_REPLICATION_TRANSACTION_ID));
+ assertEquals("Unexpected join time", joinTime, _node.getAttribute(JOIN_TIME));
+ }
+
+ @SuppressWarnings("unchecked")
+ public void testSetRoleAttribute() throws Exception
+ {
+ updateNodeState();
+ _node.setAttributes(Collections.<String, Object>singletonMap(ROLE, State.MASTER.name()));
+ when(_remoteReplicationAdmin.transferMaster(any(Set.class), any(int.class), any(TimeUnit.class), any(boolean.class))).thenReturn(_nodeName);
+
+ verify(_remoteReplicationAdmin).transferMaster(any(Set.class), any(int.class), any(TimeUnit.class), any(boolean.class));
+ }
+
+ public void testSetImmutableAttributesThrowException() throws Exception
+ {
+ Map<String, Object> changeAttributeMap = new HashMap<String, Object>();
+ changeAttributeMap.put(GROUP_NAME, "newGroupName");
+ changeAttributeMap.put(HELPER_HOST_PORT, "newhost:1234");
+ changeAttributeMap.put(HOST_PORT, "newhost:1234");
+ changeAttributeMap.put(COALESCING_SYNC, Boolean.FALSE);
+ changeAttributeMap.put(DURABILITY, "durability");
+ changeAttributeMap.put(JOIN_TIME, 1000l);
+ changeAttributeMap.put(LAST_KNOWN_REPLICATION_TRANSACTION_ID, 10001l);
+ changeAttributeMap.put(NAME, "newName");
+ changeAttributeMap.put(STORE_PATH, "/not/used");
+ changeAttributeMap.put(PARAMETERS, Collections.emptyMap());
+ changeAttributeMap.put(REPLICATION_PARAMETERS, Collections.emptyMap());
+
+ for (Entry<String, Object> entry : changeAttributeMap.entrySet())
+ {
+ assertSetAttributesThrowsException(entry.getKey(), entry.getValue());
+ }
+ }
+
+ private void assertSetAttributesThrowsException(String attributeName, Object attributeValue) throws Exception
+ {
+ updateNodeState();
+
+ try
+ {
+ _node.setAttributes(Collections.<String, Object>singletonMap(attributeName, attributeValue));
+ fail("Operation to change attribute '" + attributeName + "' should fail");
+ }
+ catch(IllegalConfigurationException e)
+ {
+ // pass
+ }
+ }
+
+ private void updateNodeState() throws Exception
+ {
+ updateNodeState( State.REPLICA, System.currentTimeMillis(), 3);
+ }
+
+ private void updateNodeState(State state, long joinTime, long currentTxnEndVLSN) throws Exception
+ {
+ NodeState nodeState = new NodeState(_nodeName, _groupName, state, null, null, joinTime, currentTxnEndVLSN, 2, 1, 0, null, 0.0);
+ when(_dbPing.getNodeState()).thenReturn(nodeState);
+ _node.updateNodeState();
+ }
+}
diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java
index 995af791e9..84b8de7be9 100644
--- a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java
+++ b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java
@@ -120,7 +120,7 @@ public class HAClusterBlackboxTest extends QpidBrokerTestCase
connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
}
- public void testTransferMaster() throws Exception
+ public void testTransferMasterFromLocalNode() throws Exception
{
final Connection connection = getConnection(_brokerFailoverUrl);
@@ -145,6 +145,33 @@ public class HAClusterBlackboxTest extends QpidBrokerTestCase
assertProducingConsuming(connection);
}
+ public void testTransferMasterFromRemoteNode() throws Exception
+ {
+ final Connection connection = getConnection(_brokerFailoverUrl);
+
+ ((AMQConnection)connection).setConnectionListener(_failoverAwaitingListener);
+
+ final int activeBrokerPort = _clusterCreator.getBrokerPortNumberFromConnection(connection);
+ LOGGER.info("Active connection port " + activeBrokerPort);
+
+ final int inactiveBrokerPort = _clusterCreator.getPortNumberOfAnInactiveBroker(connection);
+ LOGGER.info("Update role attribute on inactive broker on port " + inactiveBrokerPort);
+
+ String nodeName = _clusterCreator.getNodeNameForBrokerPort(inactiveBrokerPort);
+ _clusterCreator.awaitNodeToAttainRole(activeBrokerPort, nodeName, "REPLICA");
+ Map<String, Object> attributes = _clusterCreator.getReplicationNodeAttributes(activeBrokerPort, nodeName);
+ assertEquals("Inactive broker has unexpeced role", "REPLICA", attributes.get(ReplicationNode.ROLE));
+
+ _clusterCreator.setReplicationNodeAttributes(activeBrokerPort, nodeName, Collections.<String, Object>singletonMap(ReplicationNode.ROLE, "MASTER"));
+
+ _failoverAwaitingListener.assertFailoverOccurs(20000);
+ LOGGER.info("Listener has finished");
+
+ attributes = _clusterCreator.getReplicationNodeAttributes(inactiveBrokerPort);
+ assertEquals("Inactive broker has unexpeced role", "MASTER", attributes.get(ReplicationNode.ROLE));
+
+ assertProducingConsuming(connection);
+ }
public void testQuorumOverride() throws Exception
{
final Connection connection = getConnection(_brokerFailoverUrl);
diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterTwoNodeTest.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterTwoNodeTest.java
index 2b4b8beb9d..0031b024ba 100644
--- a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterTwoNodeTest.java
+++ b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterTwoNodeTest.java
@@ -221,19 +221,8 @@ public class HAClusterTwoNodeTest extends QpidBrokerTestCase
private void awaitNodeToAttainRole(int brokerPort, String desiredRole) throws Exception
{
- final long startTime = System.currentTimeMillis();
- Map<String, Object> data = Collections.emptyMap();
-
- while(!desiredRole.equals(data.get(ReplicationNode.ROLE)) && (System.currentTimeMillis() - startTime) < 30000)
- {
- _logger.debug("Awaiting node to transit into " + desiredRole + " role");
- data = _clusterCreator.getReplicationNodeAttributes(brokerPort);
- if (!desiredRole.equals(data.get(ReplicationNode.ROLE)))
- {
- Thread.sleep(1000);
- }
- }
- assertEquals("Node is in unexpected role", desiredRole, data.get(ReplicationNode.ROLE));
+ String nodeName = _clusterCreator.getNodeNameForBrokerPort(brokerPort);
+ _clusterCreator.awaitNodeToAttainRole(brokerPort, nodeName, desiredRole);
}
}
diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java
index 71a0373798..824eaf0a3d 100644
--- a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java
+++ b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java
@@ -20,8 +20,10 @@
package org.apache.qpid.server.store.berkeleydb;
import java.io.File;
+import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -39,6 +41,8 @@ import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
+import junit.framework.Assert;
+
import org.apache.log4j.Logger;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQConnectionURL;
@@ -50,6 +54,8 @@ import org.apache.qpid.systest.rest.RestTestHelper;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
import org.apache.qpid.test.utils.TestBrokerConfiguration;
import org.apache.qpid.url.URLSyntaxException;
+import org.codehaus.jackson.JsonGenerationException;
+import org.codehaus.jackson.map.JsonMappingException;
public class HATestClusterCreator
{
@@ -384,8 +390,14 @@ public class HATestClusterCreator
public void setReplicationNodeAttributes(int brokerPort, Map<String, Object> attributeMap) throws Exception
{
- RestTestHelper restHelper = createRestTestHelper(brokerPort);
String replicationNodeName = getNodeNameForBrokerPort(brokerPort);
+ setReplicationNodeAttributes(brokerPort, replicationNodeName, attributeMap);
+ }
+
+ public void setReplicationNodeAttributes(int brokerPort, String replicationNodeName, Map<String, Object> attributeMap)
+ throws IOException, JsonGenerationException, JsonMappingException, Exception
+ {
+ RestTestHelper restHelper = createRestTestHelper(brokerPort);
int status = restHelper.submitRequest("/rest/replicationnode/" + _virtualHostName + "/" + replicationNodeName , "PUT", attributeMap);
if (status != 200)
{
@@ -396,6 +408,11 @@ public class HATestClusterCreator
public Map<String, Object> getReplicationNodeAttributes(int brokerPort) throws Exception
{
String replicationNodeName = getNodeNameForBrokerPort(brokerPort);
+ return getReplicationNodeAttributes(brokerPort, replicationNodeName);
+ }
+
+ public Map<String, Object> getReplicationNodeAttributes(int brokerPort, String replicationNodeName) throws IOException
+ {
RestTestHelper restHelper = createRestTestHelper(brokerPort);
return restHelper.getJsonAsSingletonList("/rest/replicationnode/" + _virtualHostName + "/" + replicationNodeName );
}
@@ -406,4 +423,20 @@ public class HATestClusterCreator
return RestTestHelper.createRestTestHelperWithDefaultCredentials(httpPort);
}
+ public void awaitNodeToAttainRole(int brokerPort, String nodeName, String desiredRole) throws Exception
+ {
+ final long startTime = System.currentTimeMillis();
+ Map<String, Object> data = Collections.emptyMap();
+
+ while(!desiredRole.equals(data.get(ReplicationNode.ROLE)) && (System.currentTimeMillis() - startTime) < 30000)
+ {
+ LOGGER.debug("Awaiting node to transit into " + desiredRole + " role");
+ data = getReplicationNodeAttributes(brokerPort, nodeName);
+ if (!desiredRole.equals(data.get(ReplicationNode.ROLE)))
+ {
+ Thread.sleep(1000);
+ }
+ }
+ Assert.assertEquals("Node is in unexpected role", desiredRole, data.get(ReplicationNode.ROLE));
+ }
}