summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2014-05-21 11:05:21 +0000
committerKeith Wall <kwall@apache.org>2014-05-21 11:05:21 +0000
commit18db9663c7c73af9d1dc5a82478419f56f597851 (patch)
treeb8bf591fdfd85ede99bcdd6eea89766e36b355a9 /qpid/java
parent6619d55f7d03e67df6f9f2956779c33d1de66776 (diff)
downloadqpid-python-18db9663c7c73af9d1dc5a82478419f56f597851.tar.gz
QPID-5715: [Java Broker] Make virtualhosts respect the states ACTIVE and STOPPED
* Add state transition tests for BDBHA virtualhostnode / virtualhost * Prevent the BDBVHN activating the VH (this is now a responsibility of attain desired state) * BDBHARemoteReplicationNode use state UNAVAILABLE in the case where the remote node is not MASTER or REPLICA. Work by Andrew MacBean <andymacbean@gmail.com> and me. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1596536 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNodeImpl.java24
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java9
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java6
-rw-r--r--qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeRestTest.java126
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java6
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/VirtualHostNodeRestTest.java6
6 files changed, 158 insertions, 19 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNodeImpl.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNodeImpl.java
index 20c80ad765..9c7fa65928 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNodeImpl.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNodeImpl.java
@@ -21,12 +21,14 @@
package org.apache.qpid.server.virtualhostnode.berkeleydb;
+import static com.sleepycat.je.rep.ReplicatedEnvironment.State.MASTER;
+import static com.sleepycat.je.rep.ReplicatedEnvironment.State.REPLICA;
+
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import com.sleepycat.je.rep.MasterStateException;
-import com.sleepycat.je.rep.ReplicatedEnvironment;
import org.apache.log4j.Logger;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
@@ -103,7 +105,7 @@ public class BDBHARemoteReplicationNodeImpl extends AbstractConfiguredObject<BDB
super.deleted();
}
- @StateTransition(currentState = {State.ACTIVE, State.STOPPED}, desiredState = State.DELETED)
+ @StateTransition(currentState = {State.ACTIVE, State.UNAVAILABLE}, desiredState = State.DELETED)
private void doDelete()
{
String nodeName = getName();
@@ -159,13 +161,13 @@ public class BDBHARemoteReplicationNodeImpl extends AbstractConfiguredObject<BDB
if (changedAttributes.contains(ROLE))
{
String currentRole = getRole();
- if (!ReplicatedEnvironment.State.REPLICA.name().equals(currentRole))
+ if (!REPLICA.name().equals(currentRole))
{
throw new IllegalArgumentException("Cannot transfer mastership when not a replica");
}
- if (!ReplicatedEnvironment.State.MASTER.name().equals(((BDBHARemoteReplicationNode<?>)proxyForValidation).getRole()))
+ if (!MASTER.name().equals(((BDBHARemoteReplicationNode<?>)proxyForValidation).getRole()))
{
- throw new IllegalArgumentException("Changing role to other value then " + ReplicatedEnvironment.State.MASTER.name() + " is unsupported");
+ throw new IllegalArgumentException("Changing role to other value then " + MASTER.name() + " is unsupported");
}
}
@@ -183,6 +185,7 @@ public class BDBHARemoteReplicationNodeImpl extends AbstractConfiguredObject<BDB
void setRole(String role)
{
_role = role;
+ updateModelStateFromRole(role);
}
void setJoinTime(long joinTime)
@@ -195,4 +198,15 @@ public class BDBHARemoteReplicationNodeImpl extends AbstractConfiguredObject<BDB
_lastTransactionId = lastTransactionId;
}
+ private void updateModelStateFromRole(final String role)
+ {
+ State currentState = _state.get();
+ if (currentState == State.DELETED)
+ {
+ return;
+ }
+
+ boolean isActive = MASTER.name().equals(role) || REPLICA.name().equals(role);
+ _state.compareAndSet(currentState, isActive ? State.ACTIVE : State.UNAVAILABLE);
+ }
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
index 4061da177b..d162a43834 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
@@ -330,7 +330,7 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu
}
}
- @StateTransition( currentState = { State.ACTIVE, State.ERRORED }, desiredState = State.STOPPED )
+ @StateTransition( currentState = { State.UNINITIALIZED, State.ACTIVE, State.ERRORED }, desiredState = State.STOPPED )
protected void doStop()
{
try
@@ -441,7 +441,6 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu
}
});
}
- host.start();
}
catch (Exception e)
@@ -707,6 +706,12 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu
}
@Override
+ protected void onCreate()
+ {
+ // Do not persist replica virtualhost
+ }
+
+ @Override
protected <C extends ConfiguredObject> C addChild(final Class<C> childClass,
final Map<String, Object> attributes,
final ConfiguredObject... otherParents)
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java
index eb7cc03387..fb8d972fb3 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java
@@ -143,16 +143,11 @@ public class BDBHAVirtualHostNodeTest extends QpidTestCase
VirtualHostNode<?> node = createHaVHN(attributes);
final CountDownLatch virtualHostAddedLatch = new CountDownLatch(1);
- final CountDownLatch virtualHostStateChangeLatch = new CountDownLatch(1);
node.addChangeListener(new ConfigurationChangeListener()
{
@Override
public void stateChanged(ConfiguredObject object, State oldState, State newState)
{
- if (object instanceof VirtualHost)
- {
- virtualHostStateChangeLatch.countDown();
- }
}
@Override
@@ -195,7 +190,6 @@ public class BDBHAVirtualHostNodeTest extends QpidTestCase
assertEquals("Unexpected JE replication stream timeout", repStreamTimeout, replicationConfig.getConfigParam(ReplicationConfig.REP_STREAM_TIMEOUT));
assertTrue("Virtual host child has not been added", virtualHostAddedLatch.await(30, TimeUnit.SECONDS));
- assertTrue("Virtual host child has not had a state change", virtualHostStateChangeLatch.await(30, TimeUnit.SECONDS));
VirtualHost<?, ?, ?> virtualHost = node.getVirtualHost();
assertNotNull("Virtual host child was not added", virtualHost);
assertEquals("Unexpected virtual host name", groupName, virtualHost.getName());
diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeRestTest.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeRestTest.java
index 675b052a91..7f8f3ad22c 100644
--- a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeRestTest.java
+++ b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeRestTest.java
@@ -29,12 +29,16 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.qpid.server.model.RemoteReplicationNode;
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.model.VirtualHostNode;
import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade;
import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHARemoteReplicationNode;
import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHAVirtualHostNode;
+import org.apache.qpid.systest.rest.Asserts;
import org.apache.qpid.systest.rest.QpidRestTestCase;
import org.apache.qpid.test.utils.TestBrokerConfiguration;
import org.apache.qpid.util.FileUtils;
@@ -106,6 +110,69 @@ public class BDBHAVirtualHostNodeRestTest extends QpidRestTestCase
assertRemoteNodes(NODE1, NODE2, NODE3);
}
+ public void testMutateStateOfOneNode() throws Exception
+ {
+ createHANode(NODE1, _node1HaPort, _node1HaPort);
+ createHANode(NODE2, _node2HaPort, _node1HaPort);
+ createHANode(NODE3, _node3HaPort, _node1HaPort);
+
+ String node1Url = _baseNodeRestUrl + NODE1;
+ String node2Url = _baseNodeRestUrl + NODE2;
+ String node3Url = _baseNodeRestUrl + NODE3;
+
+ assertActualAndDesiredStates(node1Url, "ACTIVE", "ACTIVE");
+ assertActualAndDesiredStates(node2Url, "ACTIVE", "ACTIVE");
+ assertActualAndDesiredStates(node3Url, "ACTIVE", "ACTIVE");
+
+ mutateDesiredState(node1Url, "STOPPED");
+
+ assertActualAndDesiredStates(node1Url, "STOPPED", "STOPPED");
+ assertActualAndDesiredStates(node2Url, "ACTIVE", "ACTIVE");
+ assertActualAndDesiredStates(node3Url, "ACTIVE", "ACTIVE");
+
+ List<Map<String, Object>> remoteNodes = getRestTestHelper().getJsonAsList("replicationnode/" + NODE2);
+ assertEquals("Unexpected number of remote nodes on " + NODE2, 2, remoteNodes.size());
+
+ Map<String, Object> remoteNode1 = findRemoteNodeByName(remoteNodes, NODE1);
+
+ assertEquals("Node 1 observed from node 2 is in the wrong state",
+ "UNAVAILABLE", remoteNode1.get(BDBHARemoteReplicationNode.STATE));
+ assertEquals("Node 1 observed from node 2 has the wrong role",
+ "UNKNOWN", remoteNode1.get(BDBHARemoteReplicationNode.ROLE));
+
+ }
+
+ public void testNewMasterElectedWhenVirtualHostIsStopped() throws Exception
+ {
+ createHANode(NODE1, _node1HaPort, _node1HaPort);
+ createHANode(NODE2, _node2HaPort, _node1HaPort);
+ createHANode(NODE3, _node3HaPort, _node1HaPort);
+
+ String node1Url = _baseNodeRestUrl + NODE1;
+ String node2Url = _baseNodeRestUrl + NODE2;
+ String node3Url = _baseNodeRestUrl + NODE3;
+
+ assertActualAndDesiredStates(node1Url, "ACTIVE", "ACTIVE");
+ assertActualAndDesiredStates(node2Url, "ACTIVE", "ACTIVE");
+ assertActualAndDesiredStates(node3Url, "ACTIVE", "ACTIVE");
+
+ // Put virtualhost in STOPPED state
+ String virtualHostRestUrl = "virtualhost/" + NODE1 + "/" + _hostName;
+ assertActualAndDesiredStates(virtualHostRestUrl, "ACTIVE", "ACTIVE");
+ mutateDesiredState(virtualHostRestUrl, "STOPPED");
+ assertActualAndDesiredStates(virtualHostRestUrl, "STOPPED", "STOPPED");
+
+ // Now stop node 1 to cause an election between nodes 2 & 3
+ mutateDesiredState(node1Url, "STOPPED");
+ assertActualAndDesiredStates(node1Url, "STOPPED", "STOPPED");
+
+ Map<String, Object> newMasterData = awaitNewMaster(node2Url, node3Url);
+
+ //Check the virtual host of the new master is in the stopped state
+ String newMasterVirtualHostRestUrl = "virtualhost/" + newMasterData.get(BDBHAVirtualHostNode.NAME) + "/" + _hostName;
+ assertActualAndDesiredStates(newMasterVirtualHostRestUrl, "STOPPED", "STOPPED");
+ }
+
public void testDeleteReplicaNode() throws Exception
{
createHANode(NODE1, _node1HaPort, _node1HaPort);
@@ -128,6 +195,7 @@ public class BDBHAVirtualHostNodeRestTest extends QpidRestTestCase
{
Thread.sleep(100l);
}
+ counter++;
}
assertEquals("Unexpected number of remote nodes on " + NODE1, 1, data.size());
}
@@ -167,6 +235,7 @@ public class BDBHAVirtualHostNodeRestTest extends QpidRestTestCase
{
Thread.sleep(100l);
}
+ counter++;
}
assertEquals("Unexpected number of remote nodes on " + NODE2, 1, data.size());
}
@@ -259,4 +328,61 @@ public class BDBHAVirtualHostNodeRestTest extends QpidRestTestCase
assertNotNull("Node " + name + " has unexpected joinTime", joinTime);
assertTrue("Node " + name + " has unexpected joinTime " + joinTime, joinTime > 0);
}
+
+ private void assertActualAndDesiredStates(final String restUrl,
+ final String expectedDesiredState,
+ final String expectedActualState) throws IOException
+ {
+ Map<String, Object> objectData = getRestTestHelper().getJsonAsSingletonList(restUrl);
+ Asserts.assertActualAndDesiredState(expectedDesiredState, expectedActualState, objectData);
+ }
+
+ private void mutateDesiredState(final String restUrl, final String newState) throws IOException
+ {
+ Map<String, Object> newAttributes = new HashMap<String, Object>();
+ newAttributes.put(VirtualHostNode.DESIRED_STATE, newState);
+
+ getRestTestHelper().submitRequest(restUrl, "PUT", newAttributes, HttpServletResponse.SC_OK);
+ }
+
+ private Map<String, Object> findRemoteNodeByName(final List<Map<String, Object>> remoteNodes, final String nodeName)
+ {
+ Map<String, Object> foundNode = null;
+ for (Map<String, Object> remoteNode : remoteNodes)
+ {
+ if (nodeName.equals(remoteNode.get(RemoteReplicationNode.NAME)))
+ {
+ foundNode = remoteNode;
+ break;
+ }
+ }
+ assertNotNull("Could not find node with name " + nodeName + " amongst remote nodes.");
+ return foundNode;
+ }
+
+ private Map<String, Object> awaitNewMaster(final String... nodeUrls)
+ throws IOException, InterruptedException
+ {
+ Map<String, Object> newMasterData = null;
+ int counter = 0;
+ while (newMasterData == null && counter < 50)
+ {
+ for(String nodeUrl: nodeUrls)
+ {
+ Map<String, Object> nodeData = getRestTestHelper().getJsonAsSingletonList(nodeUrl);
+ if ("MASTER".equals(nodeData.get(BDBHAVirtualHostNode.ROLE)))
+ {
+ newMasterData = nodeData;
+ break;
+ }
+ }
+ if (newMasterData == null)
+ {
+ Thread.sleep(100l);
+ counter++;
+ }
+ }
+ assertNotNull("Could not find new master", newMasterData);
+ return newMasterData;
+ }
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
index 2bec380820..86c19941f8 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
@@ -1209,7 +1209,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
return _housekeepingThreadCount;
}
- @StateTransition( currentState = { State.ACTIVE, State.ERRORED, State.UNINITIALIZED }, desiredState = State.STOPPED )
+ @StateTransition( currentState = { State.UNINITIALIZED, State.ACTIVE, State.ERRORED }, desiredState = State.STOPPED )
protected void doStop()
{
closeChildren();
@@ -1219,7 +1219,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
}
- @StateTransition( currentState = { State.ACTIVE, State.QUIESCED, State.ERRORED }, desiredState = State.DELETED )
+ @StateTransition( currentState = { State.ACTIVE, State.ERRORED }, desiredState = State.DELETED )
private void doDelete()
{
if(_deleted.compareAndSet(false,true))
@@ -1400,7 +1400,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
getDurableConfigurationStore().create(new ConfiguredObjectRecordImpl(record.getId(), record.getType(), record.getAttributes()));
}
- @StateTransition( currentState = {State.UNINITIALIZED, State.ERRORED, State.QUIESCED, State.STOPPED}, desiredState = State.ACTIVE )
+ @StateTransition( currentState = { State.UNINITIALIZED, State.STOPPED, State.ERRORED }, desiredState = State.ACTIVE )
private void onActivate()
{
_houseKeepingTasks = new ScheduledThreadPoolExecutor(getHousekeepingThreadCount());
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/VirtualHostNodeRestTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/VirtualHostNodeRestTest.java
index 214a961b00..2d5f083380 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/VirtualHostNodeRestTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/VirtualHostNodeRestTest.java
@@ -94,10 +94,10 @@ public class VirtualHostNodeRestTest extends QpidRestTestCase
assertActualAndDesireStates(restUrl, "ACTIVE", "ACTIVE");
- mutateVirtualHostDesiredState(restUrl, "STOPPED");
+ mutateVirtualHostNodeDesiredState(restUrl, "STOPPED");
assertActualAndDesireStates(restUrl, "STOPPED", "STOPPED");
- mutateVirtualHostDesiredState(restUrl, "ACTIVE");
+ mutateVirtualHostNodeDesiredState(restUrl, "ACTIVE");
assertActualAndDesireStates(restUrl, "ACTIVE", "ACTIVE");
}
@@ -145,7 +145,7 @@ public class VirtualHostNodeRestTest extends QpidRestTestCase
Asserts.assertActualAndDesiredState(expectedDesiredState, expectedActualState, virtualhostNode);
}
- private void mutateVirtualHostDesiredState(final String restUrl, final String newState) throws IOException
+ private void mutateVirtualHostNodeDesiredState(final String restUrl, final String newState) throws IOException
{
Map<String, Object> newAttributes = new HashMap<String, Object>();
newAttributes.put(VirtualHostNode.DESIRED_STATE, newState);