summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2014-02-05 15:41:01 +0000
committerKeith Wall <kwall@apache.org>2014-02-05 15:41:01 +0000
commitfc39d88fa07a9558e94f1d751e578d7b30c0bbad (patch)
tree803fd2c81a54b28b30fd8dff24d534d0c30eb879
parent4d7e126fd91bb6024840cb716e89f358cf8b755b (diff)
downloadqpid-python-fc39d88fa07a9558e94f1d751e578d7b30c0bbad.tar.gz
QPID-5409: Add functionality to delete remote node
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-bdb-ha@1564814 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNode.java13
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java14
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java76
3 files changed, 79 insertions, 24 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNode.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNode.java
index 594dd8978f..a1785703b1 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNode.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNode.java
@@ -167,10 +167,19 @@ public class RemoteReplicationNode extends AbstractAdapter implements Replicatio
{
return true;
}
- else
+ else if (desiredState == State.DELETED)
{
- return false;
+ if (ReplicatedEnvironment.State.REPLICA.name().equals(getAttribute(ROLE)) )
+ {
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Deleting node " + _groupName + ":" + getName());
+ }
+ _replicationGroupAdmin.removeMember(getName());
+ return true;
+ }
}
+ return false;
}
@Override
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
index 287368f870..391bcc9bd1 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
@@ -20,16 +20,7 @@
*/
package org.apache.qpid.server.store.berkeleydb.replication;
-import static org.apache.qpid.server.model.ReplicationNode.COALESCING_SYNC;
-import static org.apache.qpid.server.model.ReplicationNode.DESIGNATED_PRIMARY;
-import static org.apache.qpid.server.model.ReplicationNode.DURABILITY;
-import static org.apache.qpid.server.model.ReplicationNode.GROUP_NAME;
-import static org.apache.qpid.server.model.ReplicationNode.HELPER_HOST_PORT;
-import static org.apache.qpid.server.model.ReplicationNode.HOST_PORT;
-import static org.apache.qpid.server.model.ReplicationNode.PARAMETERS;
-import static org.apache.qpid.server.model.ReplicationNode.REPLICATION_PARAMETERS;
import static org.apache.qpid.server.model.ReplicationNode.*;
-import static org.apache.qpid.server.model.ReplicationNode.STORE_PATH;
import java.io.File;
import java.net.InetSocketAddress;
@@ -368,13 +359,12 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
}
}
- StateChangeListener listener = _stateChangeListener.get();
-
if (state == ReplicatedEnvironment.State.MASTER)
{
reopenDatabases();
}
+ StateChangeListener listener = _stateChangeListener.get();
if (listener != null)
{
listener.stateChange(stateChangeEvent);
@@ -981,6 +971,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
RemoteReplicationNode remoteNode = _remoteReplicationNodeFactory.create(replicationNode, group.getName());
_remoteReplicationNodes.put(discoveredNodeName, remoteNode);
+
if (replicationGroupListener != null)
{
replicationGroupListener.onReplicationNodeAddedToGroup(remoteNode);
@@ -1123,5 +1114,4 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
}
-
}
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
index 1bfc4b3b20..6090c79bd1 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
@@ -20,14 +20,6 @@
*/
package org.apache.qpid.server.store.berkeleydb.replication;
-import static org.apache.qpid.server.model.ReplicationNode.COALESCING_SYNC;
-import static org.apache.qpid.server.model.ReplicationNode.DESIGNATED_PRIMARY;
-import static org.apache.qpid.server.model.ReplicationNode.DURABILITY;
-import static org.apache.qpid.server.model.ReplicationNode.GROUP_NAME;
-import static org.apache.qpid.server.model.ReplicationNode.HELPER_HOST_PORT;
-import static org.apache.qpid.server.model.ReplicationNode.HOST_PORT;
-import static org.apache.qpid.server.model.ReplicationNode.NAME;
-import static org.apache.qpid.server.model.ReplicationNode.REPLICATION_PARAMETERS;
import static org.apache.qpid.server.model.ReplicationNode.*;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -45,6 +37,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.qpid.AMQStoreException;
+import org.apache.qpid.server.configuration.updater.TaskExecutor;
import org.apache.qpid.server.model.ReplicationNode;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.replication.ReplicationGroupListener;
@@ -85,12 +78,17 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase
private File _storePath;
private final Map<String, ReplicatedEnvironmentFacade> _nodes = new HashMap<String, ReplicatedEnvironmentFacade>();
private VirtualHost _virtualHost = mock(VirtualHost.class);
+
private RemoteReplicationNodeFactory _remoteReplicationNodeFactory = new ReplicatedEnvironmentFacadeFactory.RemoteReplicationNodeFactoryImpl(_virtualHost);
public void setUp() throws Exception
{
super.setUp();
+ TaskExecutor taskExecutor = mock(TaskExecutor.class);
+ when(taskExecutor.isTaskExecutorThread()).thenReturn(true);
+ when(_virtualHost.getTaskExecutor()).thenReturn(taskExecutor);
+
_storePath = TestFileUtils.createTestDirectory("bdb", true);
when(_virtualHost.getAttribute(VirtualHost.REMOTE_REPLICATION_NODE_MONITOR_INTERVAL)).thenReturn(100L);
@@ -518,6 +516,66 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase
assertEquals("Node made unknown an unexpected number of times", 1, unknownStateChangeCount.get());
}
+ public void testEnvironmentFacadeDetectsRemovalOfRemoteNode() throws Exception
+ {
+ final CountDownLatch nodeRemovedLatch = new CountDownLatch(1);
+ final CountDownLatch nodeAddedLatch = new CountDownLatch(1);
+ final AtomicReference<ReplicationNode> addedNodeRef = new AtomicReference<ReplicationNode>();
+ final AtomicReference<ReplicationNode> removedNodeRef = new AtomicReference<ReplicationNode>();
+ ReplicationGroupListener listener = new NoopReplicationGroupListener()
+ {
+ @Override
+ public void onReplicationNodeAddedToGroup(ReplicationNode node)
+ {
+ if (addedNodeRef.compareAndSet(null, node))
+ {
+ nodeAddedLatch.countDown();
+ }
+ }
+
+ @Override
+ public void onReplicationNodeRemovedFromGroup(ReplicationNode node)
+ {
+ removedNodeRef.set(node);
+ nodeRemovedLatch.countDown();
+ }
+ };
+
+ TestStateChangeListener stateChangeListener = new TestStateChangeListener(State.MASTER);
+ final ReplicatedEnvironmentFacade masterEnvironment = addNode(State.MASTER, stateChangeListener, listener);
+ assertTrue("Master was not started", stateChangeListener.awaitForStateChange(LISTENER_TIMEOUT, TimeUnit.SECONDS));
+
+ masterEnvironment.setDesignatedPrimary(true);
+
+ int replica1Port = getNextAvailable(TEST_NODE_PORT + 1);
+ String node1NodeHostPort = "localhost:" + replica1Port;
+
+ String replicaName = TEST_NODE_NAME + "_1";
+ addReplica(replicaName, node1NodeHostPort);
+
+ assertTrue("Node should be added", nodeAddedLatch.await(WAIT_STATE_CHANGE_TIMEOUT, TimeUnit.SECONDS));
+
+ ReplicationNode node = addedNodeRef.get();
+ assertEquals("Unexpected node name", replicaName, node.getName());
+
+ // Need to poll to await the remote node updating itself
+ long timeout = System.currentTimeMillis() + 5000;
+ while(!State.REPLICA.name().equals(node.getAttribute(ReplicationNode.ROLE)) && System.currentTimeMillis() < timeout)
+ {
+ Thread.sleep(200);
+ }
+ assertEquals("Unexpected node role", State.REPLICA.name(), node.getAttribute(ReplicationNode.ROLE));
+
+ // removing remote node
+ node.setDesiredState(node.getActualState(), org.apache.qpid.server.model.State.DELETED);
+
+ assertTrue("Node deleting is undetected by the environment facade", nodeRemovedLatch.await(WAIT_STATE_CHANGE_TIMEOUT, TimeUnit.SECONDS));
+ assertEquals("Unexpected node is deleted", node, removedNodeRef.get());
+
+ //TODO: need a way to shut down the remote environment when the corresponding remote node is deleted.
+ // It is unclear whether it is possible
+ }
+
public void testCloseStateTransitions() throws Exception
{
ReplicatedEnvironmentFacade replicatedEnvironmentFacade = createMaster();
@@ -552,7 +610,6 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase
private ReplicatedEnvironmentFacade addNode(String nodeName, String nodeHostPort, boolean designatedPrimary,
State desiredState, StateChangeListener stateChangeListener, ReplicationGroupListener replicationGroupListener)
{
-
LocalReplicationNode node = createReplicationNodeMock(nodeName, nodeHostPort, designatedPrimary);
ReplicatedEnvironmentFacade ref = new ReplicatedEnvironmentFacade(node, _remoteReplicationNodeFactory);
ref.setReplicationGroupListener(replicationGroupListener);
@@ -589,7 +646,6 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase
when(node.getAttribute(DURABILITY)).thenReturn(TEST_DURABILITY);
when(node.getAttribute(COALESCING_SYNC)).thenReturn(TEST_COALESCING_SYNC);
-
// TODO REF contract with LRN is too complicated.
when(node.getActualAttribute(HOST_PORT)).thenReturn(nodeHostPort);
when(node.getActualAttribute(DESIGNATED_PRIMARY)).thenReturn(designatedPrimary);