diff options
author | Keith Wall <kwall@apache.org> | 2014-05-21 11:05:21 +0000 |
---|---|---|
committer | Keith Wall <kwall@apache.org> | 2014-05-21 11:05:21 +0000 |
commit | 18db9663c7c73af9d1dc5a82478419f56f597851 (patch) | |
tree | b8bf591fdfd85ede99bcdd6eea89766e36b355a9 /qpid/java | |
parent | 6619d55f7d03e67df6f9f2956779c33d1de66776 (diff) | |
download | qpid-python-18db9663c7c73af9d1dc5a82478419f56f597851.tar.gz |
QPID-5715: [Java Broker] Make virtualhosts respect the states ACTIVE and STOPPED
* Add state transition tests for BDBHA virtualhostnode / virtualhost
* Prevent the BDBVHN activating the VH (this is now a responsibility of attain desired state)
* BDBHARemoteReplicationNode use state UNAVAILABLE in the case where the remote node is not MASTER or REPLICA.
Work by Andrew MacBean <andymacbean@gmail.com> and me.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1596536 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
6 files changed, 158 insertions, 19 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNodeImpl.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNodeImpl.java index 20c80ad765..9c7fa65928 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNodeImpl.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNodeImpl.java @@ -21,12 +21,14 @@ package org.apache.qpid.server.virtualhostnode.berkeleydb; +import static com.sleepycat.je.rep.ReplicatedEnvironment.State.MASTER; +import static com.sleepycat.je.rep.ReplicatedEnvironment.State.REPLICA; + import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicReference; import com.sleepycat.je.rep.MasterStateException; -import com.sleepycat.je.rep.ReplicatedEnvironment; import org.apache.log4j.Logger; import org.apache.qpid.server.configuration.IllegalConfigurationException; @@ -103,7 +105,7 @@ public class BDBHARemoteReplicationNodeImpl extends AbstractConfiguredObject<BDB super.deleted(); } - @StateTransition(currentState = {State.ACTIVE, State.STOPPED}, desiredState = State.DELETED) + @StateTransition(currentState = {State.ACTIVE, State.UNAVAILABLE}, desiredState = State.DELETED) private void doDelete() { String nodeName = getName(); @@ -159,13 +161,13 @@ public class BDBHARemoteReplicationNodeImpl extends AbstractConfiguredObject<BDB if (changedAttributes.contains(ROLE)) { String currentRole = getRole(); - if (!ReplicatedEnvironment.State.REPLICA.name().equals(currentRole)) + if (!REPLICA.name().equals(currentRole)) { throw new IllegalArgumentException("Cannot transfer mastership when not a replica"); } - if (!ReplicatedEnvironment.State.MASTER.name().equals(((BDBHARemoteReplicationNode<?>)proxyForValidation).getRole())) + if (!MASTER.name().equals(((BDBHARemoteReplicationNode<?>)proxyForValidation).getRole())) { - throw new IllegalArgumentException("Changing role to other value then " + ReplicatedEnvironment.State.MASTER.name() + " is unsupported"); + throw new IllegalArgumentException("Changing role to other value then " + MASTER.name() + " is unsupported"); } } @@ -183,6 +185,7 @@ public class BDBHARemoteReplicationNodeImpl extends AbstractConfiguredObject<BDB void setRole(String role) { _role = role; + updateModelStateFromRole(role); } void setJoinTime(long joinTime) @@ -195,4 +198,15 @@ public class BDBHARemoteReplicationNodeImpl extends AbstractConfiguredObject<BDB _lastTransactionId = lastTransactionId; } + private void updateModelStateFromRole(final String role) + { + State currentState = _state.get(); + if (currentState == State.DELETED) + { + return; + } + + boolean isActive = MASTER.name().equals(role) || REPLICA.name().equals(role); + _state.compareAndSet(currentState, isActive ? State.ACTIVE : State.UNAVAILABLE); + } } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java index 4061da177b..d162a43834 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java @@ -330,7 +330,7 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu } } - @StateTransition( currentState = { State.ACTIVE, State.ERRORED }, desiredState = State.STOPPED ) + @StateTransition( currentState = { State.UNINITIALIZED, State.ACTIVE, State.ERRORED }, desiredState = State.STOPPED ) protected void doStop() { try @@ -441,7 +441,6 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu } }); } - host.start(); } catch (Exception e) @@ -707,6 +706,12 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu } @Override + protected void onCreate() + { + // Do not persist replica virtualhost + } + + @Override protected <C extends ConfiguredObject> C addChild(final Class<C> childClass, final Map<String, Object> attributes, final ConfiguredObject... otherParents) diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java index eb7cc03387..fb8d972fb3 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java @@ -143,16 +143,11 @@ public class BDBHAVirtualHostNodeTest extends QpidTestCase VirtualHostNode<?> node = createHaVHN(attributes); final CountDownLatch virtualHostAddedLatch = new CountDownLatch(1); - final CountDownLatch virtualHostStateChangeLatch = new CountDownLatch(1); node.addChangeListener(new ConfigurationChangeListener() { @Override public void stateChanged(ConfiguredObject object, State oldState, State newState) { - if (object instanceof VirtualHost) - { - virtualHostStateChangeLatch.countDown(); - } } @Override @@ -195,7 +190,6 @@ public class BDBHAVirtualHostNodeTest extends QpidTestCase assertEquals("Unexpected JE replication stream timeout", repStreamTimeout, replicationConfig.getConfigParam(ReplicationConfig.REP_STREAM_TIMEOUT)); assertTrue("Virtual host child has not been added", virtualHostAddedLatch.await(30, TimeUnit.SECONDS)); - assertTrue("Virtual host child has not had a state change", virtualHostStateChangeLatch.await(30, TimeUnit.SECONDS)); VirtualHost<?, ?, ?> virtualHost = node.getVirtualHost(); assertNotNull("Virtual host child was not added", virtualHost); assertEquals("Unexpected virtual host name", groupName, virtualHost.getName()); diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeRestTest.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeRestTest.java index 675b052a91..7f8f3ad22c 100644 --- a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeRestTest.java +++ b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeRestTest.java @@ -29,12 +29,16 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import javax.servlet.http.HttpServletResponse; + +import org.apache.qpid.server.model.RemoteReplicationNode; import org.apache.qpid.server.model.State; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.model.VirtualHostNode; import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade; import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHARemoteReplicationNode; import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHAVirtualHostNode; +import org.apache.qpid.systest.rest.Asserts; import org.apache.qpid.systest.rest.QpidRestTestCase; import org.apache.qpid.test.utils.TestBrokerConfiguration; import org.apache.qpid.util.FileUtils; @@ -106,6 +110,69 @@ public class BDBHAVirtualHostNodeRestTest extends QpidRestTestCase assertRemoteNodes(NODE1, NODE2, NODE3); } + public void testMutateStateOfOneNode() throws Exception + { + createHANode(NODE1, _node1HaPort, _node1HaPort); + createHANode(NODE2, _node2HaPort, _node1HaPort); + createHANode(NODE3, _node3HaPort, _node1HaPort); + + String node1Url = _baseNodeRestUrl + NODE1; + String node2Url = _baseNodeRestUrl + NODE2; + String node3Url = _baseNodeRestUrl + NODE3; + + assertActualAndDesiredStates(node1Url, "ACTIVE", "ACTIVE"); + assertActualAndDesiredStates(node2Url, "ACTIVE", "ACTIVE"); + assertActualAndDesiredStates(node3Url, "ACTIVE", "ACTIVE"); + + mutateDesiredState(node1Url, "STOPPED"); + + assertActualAndDesiredStates(node1Url, "STOPPED", "STOPPED"); + assertActualAndDesiredStates(node2Url, "ACTIVE", "ACTIVE"); + assertActualAndDesiredStates(node3Url, "ACTIVE", "ACTIVE"); + + List<Map<String, Object>> remoteNodes = getRestTestHelper().getJsonAsList("replicationnode/" + NODE2); + assertEquals("Unexpected number of remote nodes on " + NODE2, 2, remoteNodes.size()); + + Map<String, Object> remoteNode1 = findRemoteNodeByName(remoteNodes, NODE1); + + assertEquals("Node 1 observed from node 2 is in the wrong state", + "UNAVAILABLE", remoteNode1.get(BDBHARemoteReplicationNode.STATE)); + assertEquals("Node 1 observed from node 2 has the wrong role", + "UNKNOWN", remoteNode1.get(BDBHARemoteReplicationNode.ROLE)); + + } + + public void testNewMasterElectedWhenVirtualHostIsStopped() throws Exception + { + createHANode(NODE1, _node1HaPort, _node1HaPort); + createHANode(NODE2, _node2HaPort, _node1HaPort); + createHANode(NODE3, _node3HaPort, _node1HaPort); + + String node1Url = _baseNodeRestUrl + NODE1; + String node2Url = _baseNodeRestUrl + NODE2; + String node3Url = _baseNodeRestUrl + NODE3; + + assertActualAndDesiredStates(node1Url, "ACTIVE", "ACTIVE"); + assertActualAndDesiredStates(node2Url, "ACTIVE", "ACTIVE"); + assertActualAndDesiredStates(node3Url, "ACTIVE", "ACTIVE"); + + // Put virtualhost in STOPPED state + String virtualHostRestUrl = "virtualhost/" + NODE1 + "/" + _hostName; + assertActualAndDesiredStates(virtualHostRestUrl, "ACTIVE", "ACTIVE"); + mutateDesiredState(virtualHostRestUrl, "STOPPED"); + assertActualAndDesiredStates(virtualHostRestUrl, "STOPPED", "STOPPED"); + + // Now stop node 1 to cause an election between nodes 2 & 3 + mutateDesiredState(node1Url, "STOPPED"); + assertActualAndDesiredStates(node1Url, "STOPPED", "STOPPED"); + + Map<String, Object> newMasterData = awaitNewMaster(node2Url, node3Url); + + //Check the virtual host of the new master is in the stopped state + String newMasterVirtualHostRestUrl = "virtualhost/" + newMasterData.get(BDBHAVirtualHostNode.NAME) + "/" + _hostName; + assertActualAndDesiredStates(newMasterVirtualHostRestUrl, "STOPPED", "STOPPED"); + } + public void testDeleteReplicaNode() throws Exception { createHANode(NODE1, _node1HaPort, _node1HaPort); @@ -128,6 +195,7 @@ public class BDBHAVirtualHostNodeRestTest extends QpidRestTestCase { Thread.sleep(100l); } + counter++; } assertEquals("Unexpected number of remote nodes on " + NODE1, 1, data.size()); } @@ -167,6 +235,7 @@ public class BDBHAVirtualHostNodeRestTest extends QpidRestTestCase { Thread.sleep(100l); } + counter++; } assertEquals("Unexpected number of remote nodes on " + NODE2, 1, data.size()); } @@ -259,4 +328,61 @@ public class BDBHAVirtualHostNodeRestTest extends QpidRestTestCase assertNotNull("Node " + name + " has unexpected joinTime", joinTime); assertTrue("Node " + name + " has unexpected joinTime " + joinTime, joinTime > 0); } + + private void assertActualAndDesiredStates(final String restUrl, + final String expectedDesiredState, + final String expectedActualState) throws IOException + { + Map<String, Object> objectData = getRestTestHelper().getJsonAsSingletonList(restUrl); + Asserts.assertActualAndDesiredState(expectedDesiredState, expectedActualState, objectData); + } + + private void mutateDesiredState(final String restUrl, final String newState) throws IOException + { + Map<String, Object> newAttributes = new HashMap<String, Object>(); + newAttributes.put(VirtualHostNode.DESIRED_STATE, newState); + + getRestTestHelper().submitRequest(restUrl, "PUT", newAttributes, HttpServletResponse.SC_OK); + } + + private Map<String, Object> findRemoteNodeByName(final List<Map<String, Object>> remoteNodes, final String nodeName) + { + Map<String, Object> foundNode = null; + for (Map<String, Object> remoteNode : remoteNodes) + { + if (nodeName.equals(remoteNode.get(RemoteReplicationNode.NAME))) + { + foundNode = remoteNode; + break; + } + } + assertNotNull("Could not find node with name " + nodeName + " amongst remote nodes."); + return foundNode; + } + + private Map<String, Object> awaitNewMaster(final String... nodeUrls) + throws IOException, InterruptedException + { + Map<String, Object> newMasterData = null; + int counter = 0; + while (newMasterData == null && counter < 50) + { + for(String nodeUrl: nodeUrls) + { + Map<String, Object> nodeData = getRestTestHelper().getJsonAsSingletonList(nodeUrl); + if ("MASTER".equals(nodeData.get(BDBHAVirtualHostNode.ROLE))) + { + newMasterData = nodeData; + break; + } + } + if (newMasterData == null) + { + Thread.sleep(100l); + counter++; + } + } + assertNotNull("Could not find new master", newMasterData); + return newMasterData; + } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java index 2bec380820..86c19941f8 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java @@ -1209,7 +1209,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte return _housekeepingThreadCount; } - @StateTransition( currentState = { State.ACTIVE, State.ERRORED, State.UNINITIALIZED }, desiredState = State.STOPPED ) + @StateTransition( currentState = { State.UNINITIALIZED, State.ACTIVE, State.ERRORED }, desiredState = State.STOPPED ) protected void doStop() { closeChildren(); @@ -1219,7 +1219,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte } - @StateTransition( currentState = { State.ACTIVE, State.QUIESCED, State.ERRORED }, desiredState = State.DELETED ) + @StateTransition( currentState = { State.ACTIVE, State.ERRORED }, desiredState = State.DELETED ) private void doDelete() { if(_deleted.compareAndSet(false,true)) @@ -1400,7 +1400,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte getDurableConfigurationStore().create(new ConfiguredObjectRecordImpl(record.getId(), record.getType(), record.getAttributes())); } - @StateTransition( currentState = {State.UNINITIALIZED, State.ERRORED, State.QUIESCED, State.STOPPED}, desiredState = State.ACTIVE ) + @StateTransition( currentState = { State.UNINITIALIZED, State.STOPPED, State.ERRORED }, desiredState = State.ACTIVE ) private void onActivate() { _houseKeepingTasks = new ScheduledThreadPoolExecutor(getHousekeepingThreadCount()); diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/VirtualHostNodeRestTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/VirtualHostNodeRestTest.java index 214a961b00..2d5f083380 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/VirtualHostNodeRestTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/VirtualHostNodeRestTest.java @@ -94,10 +94,10 @@ public class VirtualHostNodeRestTest extends QpidRestTestCase assertActualAndDesireStates(restUrl, "ACTIVE", "ACTIVE"); - mutateVirtualHostDesiredState(restUrl, "STOPPED"); + mutateVirtualHostNodeDesiredState(restUrl, "STOPPED"); assertActualAndDesireStates(restUrl, "STOPPED", "STOPPED"); - mutateVirtualHostDesiredState(restUrl, "ACTIVE"); + mutateVirtualHostNodeDesiredState(restUrl, "ACTIVE"); assertActualAndDesireStates(restUrl, "ACTIVE", "ACTIVE"); } @@ -145,7 +145,7 @@ public class VirtualHostNodeRestTest extends QpidRestTestCase Asserts.assertActualAndDesiredState(expectedDesiredState, expectedActualState, virtualhostNode); } - private void mutateVirtualHostDesiredState(final String restUrl, final String newState) throws IOException + private void mutateVirtualHostNodeDesiredState(final String restUrl, final String newState) throws IOException { Map<String, Object> newAttributes = new HashMap<String, Object>(); newAttributes.put(VirtualHostNode.DESIRED_STATE, newState); |