diff options
author | Keith Wall <kwall@apache.org> | 2014-02-05 15:40:50 +0000 |
---|---|---|
committer | Keith Wall <kwall@apache.org> | 2014-02-05 15:40:50 +0000 |
commit | 4d7e126fd91bb6024840cb716e89f358cf8b755b (patch) | |
tree | 36b9a314f311e05d4efeb55d7754c9065b39e636 | |
parent | a2ff3ed02e5c94c7b669a1be79505d831c017a19 (diff) | |
download | qpid-python-4d7e126fd91bb6024840cb716e89f358cf8b755b.tar.gz |
QPID-5409: Add functionality into RemoteReplicationNode to change role attribute from REPLICA to MASTER
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-bdb-ha@1564813 13f79535-47bb-0310-9956-ffa450edef68
6 files changed, 299 insertions, 23 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNode.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNode.java index 06fffddc0a..594dd8978f 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNode.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNode.java @@ -21,11 +21,18 @@ package org.apache.qpid.server.store.berkeleydb.replication; import java.io.IOException; +import java.lang.reflect.Type; import java.security.AccessControlException; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; import org.apache.log4j.Logger; +import org.apache.qpid.server.configuration.IllegalConfigurationException; import org.apache.qpid.server.configuration.updater.TaskExecutor; import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.LifetimePolicy; @@ -36,9 +43,12 @@ import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.model.adapter.AbstractAdapter; import org.apache.qpid.server.model.adapter.NoStatistics; +import org.apache.qpid.server.util.MapValueConverter; import com.sleepycat.je.rep.NodeState; +import com.sleepycat.je.rep.ReplicatedEnvironment; import com.sleepycat.je.rep.util.DbPing; +import com.sleepycat.je.rep.util.ReplicationGroupAdmin; import com.sleepycat.je.rep.utilint.ServiceDispatcher.ServiceConnectFailedException; /** @@ -48,23 +58,32 @@ public class RemoteReplicationNode extends AbstractAdapter implements Replicatio { private static final Logger LOGGER = Logger.getLogger(RemoteReplicationNode.class); + @SuppressWarnings("serial") + private static final Map<String, Type> ATTRIBUTE_TYPES = new HashMap<String, Type>() + {{ + put(ROLE, String.class); + }}; + private final com.sleepycat.je.rep.ReplicationNode _replicationNode; - private final VirtualHost _virtualHost; private final String _hostPort; private final String _groupName; + private final DbPing _dbPing; + private final ReplicationGroupAdmin _replicationGroupAdmin; private volatile String _role; private volatile long _joinTime; private volatile long _lastTransactionId; - public RemoteReplicationNode(com.sleepycat.je.rep.ReplicationNode replicationNode, String groupName, VirtualHost virtualHost, TaskExecutor taskExecutor) + public RemoteReplicationNode(com.sleepycat.je.rep.ReplicationNode replicationNode, String groupName, VirtualHost virtualHost, + TaskExecutor taskExecutor, DbPing dbPing, ReplicationGroupAdmin admin) { super(UUIDGenerator.generateReplicationNodeId(groupName, replicationNode.getName()), null, null, taskExecutor); addParent(VirtualHost.class, virtualHost); _groupName = groupName; _hostPort = replicationNode.getHostName() + ":" + replicationNode.getPort(); _replicationNode = replicationNode; - _virtualHost = virtualHost; + _dbPing = dbPing; + _replicationGroupAdmin = admin; } @Override @@ -154,7 +173,6 @@ public class RemoteReplicationNode extends AbstractAdapter implements Replicatio } } - @Override public Object getAttribute(String name) { @@ -207,15 +225,13 @@ public class RemoteReplicationNode extends AbstractAdapter implements Replicatio public void updateNodeState() { - Long monitorTimeout = (Long)_virtualHost.getAttribute(VirtualHost.REMOTE_REPLICATION_NODE_MONITOR_TIMEOUT); - DbPing ping = new DbPing(_replicationNode, _groupName, monitorTimeout.intValue()); String oldRole = _role; long oldJoinTime = _joinTime; long oldTransactionId = _lastTransactionId; try { - NodeState state = ping.getNodeState(); + NodeState state = _dbPing.getNodeState(); _role = state.getNodeState().name(); _joinTime = state.getJoinTime(); _lastTransactionId = state.getCurrentTxnEndVLSN(); @@ -252,4 +268,73 @@ public class RemoteReplicationNode extends AbstractAdapter implements Replicatio { return ReplicationNode.AVAILABLE_ATTRIBUTES; } + + @Override + public void changeAttributes(Map<String, Object> attributes) + throws IllegalStateException, AccessControlException, + IllegalArgumentException + { + checkWhetherImmutableAttributeChanged(attributes); + Map<String, Object> convertedAttributes = MapValueConverter.convert(attributes, ATTRIBUTE_TYPES); + + if (convertedAttributes.containsKey(ROLE)) + { + String currentRole = (String)getAttribute(ROLE); + if (!ReplicatedEnvironment.State.REPLICA.name().equals(currentRole)) + { + throw new IllegalConfigurationException("Cannot transfer mastership when not a replica"); + } + + String role = (String)convertedAttributes.get(ROLE); + + if (ReplicatedEnvironment.State.MASTER.name().equals(role) ) + { + try + { + String nodeName = getName(); + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Trying to transfer master to " + nodeName); + } + + _replicationGroupAdmin.transferMaster(Collections.singleton(nodeName), ReplicatedEnvironmentFacade.MASTER_TRANSFER_TIMEOUT, TimeUnit.MILLISECONDS, true); + + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("The mastership has been transfered to " + nodeName); + } + } + catch(Exception e) + { + throw new IllegalConfigurationException("Cannot transfer mastership to " + getName(), e); + } + } + else + { + throw new IllegalConfigurationException("Changing role to other value then " + + ReplicatedEnvironment.State.MASTER.name() + " is unsupported"); + } + } + + super.changeAttributes(convertedAttributes); + } + + private void checkWhetherImmutableAttributeChanged(Map<String, Object> attributes) + { + Set<String> immutableAttributeNames = new HashSet<String>(getAttributeNames()); + immutableAttributeNames.remove(ROLE); + for (String attributeName : immutableAttributeNames) + { + if (attributes.containsKey(attributeName)) + { + // the name is appended into attributes map in REST layer + if (attributeName.equals(NAME) && getName().equals(attributes.get(NAME))) + { + continue; + } + throw new IllegalConfigurationException("Cannot change value of immutable attribute " + attributeName); + } + } + } + } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java index 26651ac64c..afbe1e2187 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.store.berkeleydb.replication; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -32,6 +33,8 @@ import org.apache.qpid.server.store.berkeleydb.EnvironmentFacadeFactory; import com.sleepycat.je.Durability; import com.sleepycat.je.Durability.SyncPolicy; +import com.sleepycat.je.rep.util.DbPing; +import com.sleepycat.je.rep.util.ReplicationGroupAdmin; //TODO: Should LocalReplicationNode implement EnvironmentFacadeFactory instead of having this class? public class ReplicatedEnvironmentFacadeFactory implements EnvironmentFacadeFactory @@ -82,7 +85,13 @@ public class ReplicatedEnvironmentFacadeFactory implements EnvironmentFacadeFact attributes.put(ReplicationNode.NAME, replicationNode.getName()); attributes.put(ReplicationNode.GROUP_NAME, groupName); attributes.put(ReplicationNode.HOST_PORT, replicationNode.getHostName() + ":" + replicationNode.getPort()); - return new RemoteReplicationNode(replicationNode, groupName, _virtualHost, _virtualHost.getTaskExecutor()); + + Long monitorTimeout = (Long)_virtualHost.getAttribute(VirtualHost.REMOTE_REPLICATION_NODE_MONITOR_TIMEOUT); + DbPing dbPing = new DbPing(replicationNode, groupName, monitorTimeout.intValue()); + + ReplicationGroupAdmin replicationGroupAdmin = new ReplicationGroupAdmin(groupName, Collections.singleton(replicationNode.getSocketAddress())); + + return new RemoteReplicationNode(replicationNode, groupName, _virtualHost, _virtualHost.getTaskExecutor(), dbPing, replicationGroupAdmin); } @Override diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNodeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNodeTest.java new file mode 100644 index 0000000000..f65fecd201 --- /dev/null +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNodeTest.java @@ -0,0 +1,133 @@ +package org.apache.qpid.server.store.berkeleydb.replication; + +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.verify; +import static org.apache.qpid.server.model.ReplicationNode.*; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.Map.Entry; +import java.util.concurrent.TimeUnit; + +import org.apache.qpid.server.configuration.IllegalConfigurationException; +import org.apache.qpid.server.configuration.updater.TaskExecutor; +import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.test.utils.QpidTestCase; + +import com.sleepycat.je.rep.NodeState; +import com.sleepycat.je.rep.ReplicatedEnvironment.State; +import com.sleepycat.je.rep.ReplicationNode; +import com.sleepycat.je.rep.util.DbPing; +import com.sleepycat.je.rep.util.ReplicationGroupAdmin; + +public class RemoteReplicationNodeTest extends QpidTestCase +{ + + private RemoteReplicationNode _node; + private String _groupName; + private VirtualHost _virtualHost; + private TaskExecutor _taskExecutor; + private ReplicationNode _replicationNode; + private String _nodeName; + private int _port; + private DbPing _dbPing; + private ReplicationGroupAdmin _remoteReplicationAdmin; + + @Override + protected void setUp() throws Exception + { + super.setUp(); + _groupName = getTestName(); + _nodeName = getTestName() + "Name"; + _port = 5000; + _replicationNode = mock(ReplicationNode.class); + _virtualHost = mock(VirtualHost.class); + _taskExecutor = mock(TaskExecutor.class); + _dbPing = mock(DbPing.class); + _remoteReplicationAdmin = mock(ReplicationGroupAdmin.class); + + when(_taskExecutor.isTaskExecutorThread()).thenReturn(true); + when(_replicationNode.getName()).thenReturn(_nodeName); + when(_replicationNode.getHostName()).thenReturn("localhost"); + when(_replicationNode.getPort()).thenReturn(_port); + + _node = new RemoteReplicationNode(_replicationNode, _groupName, _virtualHost, _taskExecutor, _dbPing, _remoteReplicationAdmin); + } + + public void testGetAttribute() throws Exception + { + State state = State.MASTER; + long joinTime = System.currentTimeMillis(); + long currentTxnEndVLSN = 3; + + updateNodeState(state, joinTime, currentTxnEndVLSN); + + assertEquals("Unexpected name", _nodeName, _node.getAttribute(NAME)); + assertEquals("Unexpected group name", _groupName, _node.getAttribute(GROUP_NAME)); + assertEquals("Unexpected state", state.name(), _node.getAttribute(ROLE)); + assertEquals("Unexpected transaction id", currentTxnEndVLSN, _node.getAttribute(LAST_KNOWN_REPLICATION_TRANSACTION_ID)); + assertEquals("Unexpected join time", joinTime, _node.getAttribute(JOIN_TIME)); + } + + @SuppressWarnings("unchecked") + public void testSetRoleAttribute() throws Exception + { + updateNodeState(); + _node.setAttributes(Collections.<String, Object>singletonMap(ROLE, State.MASTER.name())); + when(_remoteReplicationAdmin.transferMaster(any(Set.class), any(int.class), any(TimeUnit.class), any(boolean.class))).thenReturn(_nodeName); + + verify(_remoteReplicationAdmin).transferMaster(any(Set.class), any(int.class), any(TimeUnit.class), any(boolean.class)); + } + + public void testSetImmutableAttributesThrowException() throws Exception + { + Map<String, Object> changeAttributeMap = new HashMap<String, Object>(); + changeAttributeMap.put(GROUP_NAME, "newGroupName"); + changeAttributeMap.put(HELPER_HOST_PORT, "newhost:1234"); + changeAttributeMap.put(HOST_PORT, "newhost:1234"); + changeAttributeMap.put(COALESCING_SYNC, Boolean.FALSE); + changeAttributeMap.put(DURABILITY, "durability"); + changeAttributeMap.put(JOIN_TIME, 1000l); + changeAttributeMap.put(LAST_KNOWN_REPLICATION_TRANSACTION_ID, 10001l); + changeAttributeMap.put(NAME, "newName"); + changeAttributeMap.put(STORE_PATH, "/not/used"); + changeAttributeMap.put(PARAMETERS, Collections.emptyMap()); + changeAttributeMap.put(REPLICATION_PARAMETERS, Collections.emptyMap()); + + for (Entry<String, Object> entry : changeAttributeMap.entrySet()) + { + assertSetAttributesThrowsException(entry.getKey(), entry.getValue()); + } + } + + private void assertSetAttributesThrowsException(String attributeName, Object attributeValue) throws Exception + { + updateNodeState(); + + try + { + _node.setAttributes(Collections.<String, Object>singletonMap(attributeName, attributeValue)); + fail("Operation to change attribute '" + attributeName + "' should fail"); + } + catch(IllegalConfigurationException e) + { + // pass + } + } + + private void updateNodeState() throws Exception + { + updateNodeState( State.REPLICA, System.currentTimeMillis(), 3); + } + + private void updateNodeState(State state, long joinTime, long currentTxnEndVLSN) throws Exception + { + NodeState nodeState = new NodeState(_nodeName, _groupName, state, null, null, joinTime, currentTxnEndVLSN, 2, 1, 0, null, 0.0); + when(_dbPing.getNodeState()).thenReturn(nodeState); + _node.updateNodeState(); + } +} diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java index 995af791e9..84b8de7be9 100644 --- a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java +++ b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java @@ -120,7 +120,7 @@ public class HAClusterBlackboxTest extends QpidBrokerTestCase connection.createSession(false, Session.AUTO_ACKNOWLEDGE); } - public void testTransferMaster() throws Exception + public void testTransferMasterFromLocalNode() throws Exception { final Connection connection = getConnection(_brokerFailoverUrl); @@ -145,6 +145,33 @@ public class HAClusterBlackboxTest extends QpidBrokerTestCase assertProducingConsuming(connection); } + public void testTransferMasterFromRemoteNode() throws Exception + { + final Connection connection = getConnection(_brokerFailoverUrl); + + ((AMQConnection)connection).setConnectionListener(_failoverAwaitingListener); + + final int activeBrokerPort = _clusterCreator.getBrokerPortNumberFromConnection(connection); + LOGGER.info("Active connection port " + activeBrokerPort); + + final int inactiveBrokerPort = _clusterCreator.getPortNumberOfAnInactiveBroker(connection); + LOGGER.info("Update role attribute on inactive broker on port " + inactiveBrokerPort); + + String nodeName = _clusterCreator.getNodeNameForBrokerPort(inactiveBrokerPort); + _clusterCreator.awaitNodeToAttainRole(activeBrokerPort, nodeName, "REPLICA"); + Map<String, Object> attributes = _clusterCreator.getReplicationNodeAttributes(activeBrokerPort, nodeName); + assertEquals("Inactive broker has unexpeced role", "REPLICA", attributes.get(ReplicationNode.ROLE)); + + _clusterCreator.setReplicationNodeAttributes(activeBrokerPort, nodeName, Collections.<String, Object>singletonMap(ReplicationNode.ROLE, "MASTER")); + + _failoverAwaitingListener.assertFailoverOccurs(20000); + LOGGER.info("Listener has finished"); + + attributes = _clusterCreator.getReplicationNodeAttributes(inactiveBrokerPort); + assertEquals("Inactive broker has unexpeced role", "MASTER", attributes.get(ReplicationNode.ROLE)); + + assertProducingConsuming(connection); + } public void testQuorumOverride() throws Exception { final Connection connection = getConnection(_brokerFailoverUrl); diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterTwoNodeTest.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterTwoNodeTest.java index 2b4b8beb9d..0031b024ba 100644 --- a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterTwoNodeTest.java +++ b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterTwoNodeTest.java @@ -221,19 +221,8 @@ public class HAClusterTwoNodeTest extends QpidBrokerTestCase private void awaitNodeToAttainRole(int brokerPort, String desiredRole) throws Exception { - final long startTime = System.currentTimeMillis(); - Map<String, Object> data = Collections.emptyMap(); - - while(!desiredRole.equals(data.get(ReplicationNode.ROLE)) && (System.currentTimeMillis() - startTime) < 30000) - { - _logger.debug("Awaiting node to transit into " + desiredRole + " role"); - data = _clusterCreator.getReplicationNodeAttributes(brokerPort); - if (!desiredRole.equals(data.get(ReplicationNode.ROLE))) - { - Thread.sleep(1000); - } - } - assertEquals("Node is in unexpected role", desiredRole, data.get(ReplicationNode.ROLE)); + String nodeName = _clusterCreator.getNodeNameForBrokerPort(brokerPort); + _clusterCreator.awaitNodeToAttainRole(brokerPort, nodeName, desiredRole); } } diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java index 71a0373798..824eaf0a3d 100644 --- a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java +++ b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java @@ -20,8 +20,10 @@ package org.apache.qpid.server.store.berkeleydb; import java.io.File; +import java.io.IOException; import java.net.InetAddress; import java.net.UnknownHostException; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -39,6 +41,8 @@ import java.util.concurrent.TimeUnit; import javax.jms.Connection; +import junit.framework.Assert; + import org.apache.log4j.Logger; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQConnectionURL; @@ -50,6 +54,8 @@ import org.apache.qpid.systest.rest.RestTestHelper; import org.apache.qpid.test.utils.QpidBrokerTestCase; import org.apache.qpid.test.utils.TestBrokerConfiguration; import org.apache.qpid.url.URLSyntaxException; +import org.codehaus.jackson.JsonGenerationException; +import org.codehaus.jackson.map.JsonMappingException; public class HATestClusterCreator { @@ -384,8 +390,14 @@ public class HATestClusterCreator public void setReplicationNodeAttributes(int brokerPort, Map<String, Object> attributeMap) throws Exception { - RestTestHelper restHelper = createRestTestHelper(brokerPort); String replicationNodeName = getNodeNameForBrokerPort(brokerPort); + setReplicationNodeAttributes(brokerPort, replicationNodeName, attributeMap); + } + + public void setReplicationNodeAttributes(int brokerPort, String replicationNodeName, Map<String, Object> attributeMap) + throws IOException, JsonGenerationException, JsonMappingException, Exception + { + RestTestHelper restHelper = createRestTestHelper(brokerPort); int status = restHelper.submitRequest("/rest/replicationnode/" + _virtualHostName + "/" + replicationNodeName , "PUT", attributeMap); if (status != 200) { @@ -396,6 +408,11 @@ public class HATestClusterCreator public Map<String, Object> getReplicationNodeAttributes(int brokerPort) throws Exception { String replicationNodeName = getNodeNameForBrokerPort(brokerPort); + return getReplicationNodeAttributes(brokerPort, replicationNodeName); + } + + public Map<String, Object> getReplicationNodeAttributes(int brokerPort, String replicationNodeName) throws IOException + { RestTestHelper restHelper = createRestTestHelper(brokerPort); return restHelper.getJsonAsSingletonList("/rest/replicationnode/" + _virtualHostName + "/" + replicationNodeName ); } @@ -406,4 +423,20 @@ public class HATestClusterCreator return RestTestHelper.createRestTestHelperWithDefaultCredentials(httpPort); } + public void awaitNodeToAttainRole(int brokerPort, String nodeName, String desiredRole) throws Exception + { + final long startTime = System.currentTimeMillis(); + Map<String, Object> data = Collections.emptyMap(); + + while(!desiredRole.equals(data.get(ReplicationNode.ROLE)) && (System.currentTimeMillis() - startTime) < 30000) + { + LOGGER.debug("Awaiting node to transit into " + desiredRole + " role"); + data = getReplicationNodeAttributes(brokerPort, nodeName); + if (!desiredRole.equals(data.get(ReplicationNode.ROLE))) + { + Thread.sleep(1000); + } + } + Assert.assertEquals("Node is in unexpected role", desiredRole, data.get(ReplicationNode.ROLE)); + } } |