diff options
author | Keith Wall <kwall@apache.org> | 2014-02-05 15:41:01 +0000 |
---|---|---|
committer | Keith Wall <kwall@apache.org> | 2014-02-05 15:41:01 +0000 |
commit | fc39d88fa07a9558e94f1d751e578d7b30c0bbad (patch) | |
tree | 803fd2c81a54b28b30fd8dff24d534d0c30eb879 | |
parent | 4d7e126fd91bb6024840cb716e89f358cf8b755b (diff) | |
download | qpid-python-fc39d88fa07a9558e94f1d751e578d7b30c0bbad.tar.gz |
QPID-5409: Add functionality to delete remote node
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-bdb-ha@1564814 13f79535-47bb-0310-9956-ffa450edef68
3 files changed, 79 insertions, 24 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNode.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNode.java index 594dd8978f..a1785703b1 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNode.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNode.java @@ -167,10 +167,19 @@ public class RemoteReplicationNode extends AbstractAdapter implements Replicatio { return true; } - else + else if (desiredState == State.DELETED) { - return false; + if (ReplicatedEnvironment.State.REPLICA.name().equals(getAttribute(ROLE)) ) + { + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Deleting node " + _groupName + ":" + getName()); + } + _replicationGroupAdmin.removeMember(getName()); + return true; + } } + return false; } @Override diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java index 287368f870..391bcc9bd1 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java @@ -20,16 +20,7 @@ */ package org.apache.qpid.server.store.berkeleydb.replication; -import static org.apache.qpid.server.model.ReplicationNode.COALESCING_SYNC; -import static org.apache.qpid.server.model.ReplicationNode.DESIGNATED_PRIMARY; -import static org.apache.qpid.server.model.ReplicationNode.DURABILITY; -import static org.apache.qpid.server.model.ReplicationNode.GROUP_NAME; -import static org.apache.qpid.server.model.ReplicationNode.HELPER_HOST_PORT; -import static org.apache.qpid.server.model.ReplicationNode.HOST_PORT; -import static org.apache.qpid.server.model.ReplicationNode.PARAMETERS; -import static org.apache.qpid.server.model.ReplicationNode.REPLICATION_PARAMETERS; import static org.apache.qpid.server.model.ReplicationNode.*; -import static org.apache.qpid.server.model.ReplicationNode.STORE_PATH; import java.io.File; import java.net.InetSocketAddress; @@ -368,13 +359,12 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan } } - StateChangeListener listener = _stateChangeListener.get(); - if (state == ReplicatedEnvironment.State.MASTER) { reopenDatabases(); } + StateChangeListener listener = _stateChangeListener.get(); if (listener != null) { listener.stateChange(stateChangeEvent); @@ -981,6 +971,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan RemoteReplicationNode remoteNode = _remoteReplicationNodeFactory.create(replicationNode, group.getName()); _remoteReplicationNodes.put(discoveredNodeName, remoteNode); + if (replicationGroupListener != null) { replicationGroupListener.onReplicationNodeAddedToGroup(remoteNode); @@ -1123,5 +1114,4 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan } - } diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java index 1bfc4b3b20..6090c79bd1 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java @@ -20,14 +20,6 @@ */ package org.apache.qpid.server.store.berkeleydb.replication; -import static org.apache.qpid.server.model.ReplicationNode.COALESCING_SYNC; -import static org.apache.qpid.server.model.ReplicationNode.DESIGNATED_PRIMARY; -import static org.apache.qpid.server.model.ReplicationNode.DURABILITY; -import static org.apache.qpid.server.model.ReplicationNode.GROUP_NAME; -import static org.apache.qpid.server.model.ReplicationNode.HELPER_HOST_PORT; -import static org.apache.qpid.server.model.ReplicationNode.HOST_PORT; -import static org.apache.qpid.server.model.ReplicationNode.NAME; -import static org.apache.qpid.server.model.ReplicationNode.REPLICATION_PARAMETERS; import static org.apache.qpid.server.model.ReplicationNode.*; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -45,6 +37,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import org.apache.qpid.AMQStoreException; +import org.apache.qpid.server.configuration.updater.TaskExecutor; import org.apache.qpid.server.model.ReplicationNode; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.replication.ReplicationGroupListener; @@ -85,12 +78,17 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase private File _storePath; private final Map<String, ReplicatedEnvironmentFacade> _nodes = new HashMap<String, ReplicatedEnvironmentFacade>(); private VirtualHost _virtualHost = mock(VirtualHost.class); + private RemoteReplicationNodeFactory _remoteReplicationNodeFactory = new ReplicatedEnvironmentFacadeFactory.RemoteReplicationNodeFactoryImpl(_virtualHost); public void setUp() throws Exception { super.setUp(); + TaskExecutor taskExecutor = mock(TaskExecutor.class); + when(taskExecutor.isTaskExecutorThread()).thenReturn(true); + when(_virtualHost.getTaskExecutor()).thenReturn(taskExecutor); + _storePath = TestFileUtils.createTestDirectory("bdb", true); when(_virtualHost.getAttribute(VirtualHost.REMOTE_REPLICATION_NODE_MONITOR_INTERVAL)).thenReturn(100L); @@ -518,6 +516,66 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase assertEquals("Node made unknown an unexpected number of times", 1, unknownStateChangeCount.get()); } + public void testEnvironmentFacadeDetectsRemovalOfRemoteNode() throws Exception + { + final CountDownLatch nodeRemovedLatch = new CountDownLatch(1); + final CountDownLatch nodeAddedLatch = new CountDownLatch(1); + final AtomicReference<ReplicationNode> addedNodeRef = new AtomicReference<ReplicationNode>(); + final AtomicReference<ReplicationNode> removedNodeRef = new AtomicReference<ReplicationNode>(); + ReplicationGroupListener listener = new NoopReplicationGroupListener() + { + @Override + public void onReplicationNodeAddedToGroup(ReplicationNode node) + { + if (addedNodeRef.compareAndSet(null, node)) + { + nodeAddedLatch.countDown(); + } + } + + @Override + public void onReplicationNodeRemovedFromGroup(ReplicationNode node) + { + removedNodeRef.set(node); + nodeRemovedLatch.countDown(); + } + }; + + TestStateChangeListener stateChangeListener = new TestStateChangeListener(State.MASTER); + final ReplicatedEnvironmentFacade masterEnvironment = addNode(State.MASTER, stateChangeListener, listener); + assertTrue("Master was not started", stateChangeListener.awaitForStateChange(LISTENER_TIMEOUT, TimeUnit.SECONDS)); + + masterEnvironment.setDesignatedPrimary(true); + + int replica1Port = getNextAvailable(TEST_NODE_PORT + 1); + String node1NodeHostPort = "localhost:" + replica1Port; + + String replicaName = TEST_NODE_NAME + "_1"; + addReplica(replicaName, node1NodeHostPort); + + assertTrue("Node should be added", nodeAddedLatch.await(WAIT_STATE_CHANGE_TIMEOUT, TimeUnit.SECONDS)); + + ReplicationNode node = addedNodeRef.get(); + assertEquals("Unexpected node name", replicaName, node.getName()); + + // Need to poll to await the remote node updating itself + long timeout = System.currentTimeMillis() + 5000; + while(!State.REPLICA.name().equals(node.getAttribute(ReplicationNode.ROLE)) && System.currentTimeMillis() < timeout) + { + Thread.sleep(200); + } + assertEquals("Unexpected node role", State.REPLICA.name(), node.getAttribute(ReplicationNode.ROLE)); + + // removing remote node + node.setDesiredState(node.getActualState(), org.apache.qpid.server.model.State.DELETED); + + assertTrue("Node deleting is undetected by the environment facade", nodeRemovedLatch.await(WAIT_STATE_CHANGE_TIMEOUT, TimeUnit.SECONDS)); + assertEquals("Unexpected node is deleted", node, removedNodeRef.get()); + + //TODO: need a way to shut down the remote environment when the corresponding remote node is deleted. + // It is unclear whether it is possible + } + public void testCloseStateTransitions() throws Exception { ReplicatedEnvironmentFacade replicatedEnvironmentFacade = createMaster(); @@ -552,7 +610,6 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase private ReplicatedEnvironmentFacade addNode(String nodeName, String nodeHostPort, boolean designatedPrimary, State desiredState, StateChangeListener stateChangeListener, ReplicationGroupListener replicationGroupListener) { - LocalReplicationNode node = createReplicationNodeMock(nodeName, nodeHostPort, designatedPrimary); ReplicatedEnvironmentFacade ref = new ReplicatedEnvironmentFacade(node, _remoteReplicationNodeFactory); ref.setReplicationGroupListener(replicationGroupListener); @@ -589,7 +646,6 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase when(node.getAttribute(DURABILITY)).thenReturn(TEST_DURABILITY); when(node.getAttribute(COALESCING_SYNC)).thenReturn(TEST_COALESCING_SYNC); - // TODO REF contract with LRN is too complicated. when(node.getActualAttribute(HOST_PORT)).thenReturn(nodeHostPort); when(node.getActualAttribute(DESIGNATED_PRIMARY)).thenReturn(designatedPrimary); |