summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Rudyy <orudyy@apache.org>2014-02-19 16:30:23 +0000
committerAlex Rudyy <orudyy@apache.org>2014-02-19 16:30:23 +0000
commit7744edf70c7c320d193115b278bd16a4c5167702 (patch)
tree774c6105b9f8b3df07a64c4dfe45a434ce50291c
parenta1b37fcc9c91b32ec1e4c43df43b5150563d0826 (diff)
downloadqpid-python-java-broker-bdb-ha.tar.gz
QPID-5409: Restart the former master environment on MASTER transferjava-broker-bdb-ha
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-bdb-ha@1569814 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java83
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java134
-rw-r--r--qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java6
3 files changed, 181 insertions, 42 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
index 253c68bfb7..8e9f7f4b70 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
@@ -150,6 +150,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
private volatile ReplicatedEnvironment _environment;
private volatile long _joinTime;
+ private volatile ReplicatedEnvironment.State _lastKnownEnvironmentState;
public ReplicatedEnvironmentFacade(ReplicatedEnvironmentConfiguration configuration, RemoteReplicationNodeFactory remoteReplicationNodeFactory)
{
@@ -229,36 +230,41 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
boolean restart = (dbe instanceof InsufficientReplicasException || dbe instanceof InsufficientReplicasException || dbe instanceof RestartRequiredException);
if (restart)
{
- if (_state.compareAndSet(State.OPEN, State.RESTARTING))
+ tryToRestartEnvironment(dbe);
+ }
+ return new AMQStoreException(contextMessage, dbe);
+ }
+
+ private void tryToRestartEnvironment(final DatabaseException dbe)
+ {
+ if (_state.compareAndSet(State.OPEN, State.RESTARTING))
+ {
+ if (dbe != null && LOGGER.isDebugEnabled())
{
- if (LOGGER.isDebugEnabled())
- {
- LOGGER.debug("Environment restarting due to exception " + dbe.getMessage(), dbe);
- }
+ LOGGER.debug("Environment restarting due to exception " + dbe.getMessage(), dbe);
+ }
- _environmentJobExecutor.execute(new Runnable()
+ _environmentJobExecutor.execute(new Runnable()
+ {
+ @Override
+ public void run()
{
- @Override
- public void run()
+ try
{
- try
- {
- restartEnvironment(dbe);
- }
- catch (Exception e)
- {
- LOGGER.error("Exception on environment restart", e);
- }
+ restartEnvironment();
}
- });
+ catch (Exception e)
+ {
+ LOGGER.error("Exception on environment restart", e);
+ }
+ }
+ });
- }
- else
- {
- LOGGER.info("Cannot restart environment because of facade state: " + _state.get());
- }
}
- return new AMQStoreException(contextMessage, dbe);
+ else
+ {
+ LOGGER.info("Cannot restart environment because of facade state: " + _state.get());
+ }
}
@Override
@@ -379,6 +385,12 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
{
listener.stateChange(stateChangeEvent);
}
+
+ if (_lastKnownEnvironmentState == ReplicatedEnvironment.State.MASTER && state == ReplicatedEnvironment.State.DETACHED && _state.get() == State.OPEN)
+ {
+ tryToRestartEnvironment(null);
+ }
+ _lastKnownEnvironmentState = state;
}
private void reopenDatabases()
@@ -726,7 +738,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
}
}
- private void restartEnvironment(DatabaseException dbe)
+ private void restartEnvironment()
{
LOGGER.info("Restarting environment");
@@ -983,15 +995,30 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
@Override
public void run()
{
- String groupName = _configuration.getGroupName();
- if (LOGGER.isDebugEnabled())
+ if (_state.get() == State.OPEN)
{
- LOGGER.debug("Checking for changes in the group " + groupName);
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Checking for changes in the group " + _configuration.getGroupName() + " on node " + _configuration.getName());
+ }
+
+ try
+ {
+ detectGroupChangesAndNotify();
+ }
+ catch(DatabaseException e)
+ {
+ handleDatabaseException("Exception on replication group check", e);
+ }
}
+ }
+ private void detectGroupChangesAndNotify()
+ {
+ String groupName = _configuration.getGroupName();
ReplicatedEnvironment env = _environment;
ReplicationGroupListener replicationGroupListener = _replicationGroupListener.get();
- if (env != null && env.isValid())
+ if (env != null)
{
ReplicationGroup group = env.getGroup();
Set<ReplicationNode> nodes = new HashSet<ReplicationNode>(group.getElectableNodes());
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
index c32791fd36..9b3f13921e 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
@@ -32,7 +32,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
-import org.apache.log4j.Logger;
import org.apache.qpid.AMQStoreException;
import org.apache.qpid.server.configuration.updater.TaskExecutor;
import org.apache.qpid.server.model.ReplicationNode;
@@ -48,6 +47,7 @@ import com.sleepycat.je.DatabaseConfig;
import com.sleepycat.je.Durability;
import com.sleepycat.je.Environment;
import com.sleepycat.je.rep.InsufficientReplicasException;
+import com.sleepycat.je.rep.ReplicatedEnvironment;
import com.sleepycat.je.rep.ReplicatedEnvironment.State;
import com.sleepycat.je.rep.ReplicationConfig;
import com.sleepycat.je.rep.StateChangeEvent;
@@ -55,7 +55,6 @@ import com.sleepycat.je.rep.StateChangeListener;
public class ReplicatedEnvironmentFacadeTest extends QpidTestCase
{
- private static final Logger LOGGER = Logger.getLogger(ReplicatedEnvironmentFacadeTest.class);
private static final int TEST_NODE_PORT = new QpidTestCase().findFreePort();
private static final int LISTENER_TIMEOUT = 5;
@@ -132,6 +131,123 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase
assertNull("Environment should be null after facade close", e);
}
+ public void testTransferMasterToSelf() throws Exception
+ {
+ final CountDownLatch firstNodeReplicaStateLatch = new CountDownLatch(1);
+ final CountDownLatch firstNodeMasterStateLatch = new CountDownLatch(1);
+ StateChangeListener stateChangeListener = new StateChangeListener(){
+
+ @Override
+ public void stateChange(StateChangeEvent event) throws RuntimeException
+ {
+ ReplicatedEnvironment.State state = event.getState();
+ if (state == ReplicatedEnvironment.State.REPLICA)
+ {
+ firstNodeReplicaStateLatch.countDown();
+ }
+ if (state == ReplicatedEnvironment.State.MASTER)
+ {
+ firstNodeMasterStateLatch.countDown();
+ }
+ }
+ };
+ ReplicatedEnvironmentFacade firstNode = addNode(State.MASTER, stateChangeListener, new NoopReplicationGroupListener());
+ assertTrue("Environment did not become a master", firstNodeMasterStateLatch.await(10, TimeUnit.SECONDS));
+
+ int replica1Port = getNextAvailable(TEST_NODE_PORT + 1);
+ String node1NodeHostPort = "localhost:" + replica1Port;
+ ReplicatedEnvironmentFacade secondNode = addReplica(TEST_NODE_NAME + "_1", node1NodeHostPort);
+ assertEquals("Unexpected state", ReplicatedEnvironment.State.REPLICA.name(), secondNode.getNodeState());
+
+ int replica2Port = getNextAvailable(replica1Port + 1);
+ String node2NodeHostPort = "localhost:" + replica2Port;
+ final CountDownLatch replicaStateLatch = new CountDownLatch(1);
+ final CountDownLatch masterStateLatch = new CountDownLatch(1);
+ StateChangeListener testStateChangeListener = new StateChangeListener()
+ {
+ @Override
+ public void stateChange(StateChangeEvent event) throws RuntimeException
+ {
+ ReplicatedEnvironment.State state = event.getState();
+ if (state == ReplicatedEnvironment.State.REPLICA)
+ {
+ replicaStateLatch.countDown();
+ }
+ if (state == ReplicatedEnvironment.State.MASTER)
+ {
+ masterStateLatch.countDown();
+ }
+ }
+ };
+ ReplicatedEnvironmentFacade thirdNode = addNode(TEST_NODE_NAME + "_2", node2NodeHostPort, TEST_DESIGNATED_PRIMARY, State.REPLICA, testStateChangeListener, new NoopReplicationGroupListener());
+ assertTrue("Environment did not become a replica", replicaStateLatch.await(10, TimeUnit.SECONDS));
+ assertEquals(3, thirdNode.getNumberOfElectableGroupMembers());
+
+ thirdNode.transferMasterToSelfAsynchronously();
+ assertTrue("Environment did not become a master", masterStateLatch.await(10, TimeUnit.SECONDS));
+ assertTrue("First node environment did not become a replica", firstNodeReplicaStateLatch.await(10, TimeUnit.SECONDS));
+ assertEquals("Unexpected state", ReplicatedEnvironment.State.REPLICA.name(), firstNode.getNodeState());
+ }
+
+ public void testTransferMasterAnotherNode() throws Exception
+ {
+ final CountDownLatch firstNodeReplicaStateLatch = new CountDownLatch(1);
+ final CountDownLatch firstNodeMasterStateLatch = new CountDownLatch(1);
+ StateChangeListener stateChangeListener = new StateChangeListener(){
+
+ @Override
+ public void stateChange(StateChangeEvent event) throws RuntimeException
+ {
+ ReplicatedEnvironment.State state = event.getState();
+ if (state == ReplicatedEnvironment.State.REPLICA)
+ {
+ firstNodeReplicaStateLatch.countDown();
+ }
+ if (state == ReplicatedEnvironment.State.MASTER)
+ {
+ firstNodeMasterStateLatch.countDown();
+ }
+ }
+ };
+ ReplicatedEnvironmentFacade firstNode = addNode(State.MASTER, stateChangeListener, new NoopReplicationGroupListener());
+ assertTrue("Environment did not become a master", firstNodeMasterStateLatch.await(10, TimeUnit.SECONDS));
+
+ int replica1Port = getNextAvailable(TEST_NODE_PORT + 1);
+ String node1NodeHostPort = "localhost:" + replica1Port;
+ ReplicatedEnvironmentFacade secondNode = addReplica(TEST_NODE_NAME + "_1", node1NodeHostPort);
+ assertEquals("Unexpected state", ReplicatedEnvironment.State.REPLICA.name(), secondNode.getNodeState());
+
+ int replica2Port = getNextAvailable(replica1Port + 1);
+ String node2NodeHostPort = "localhost:" + replica2Port;
+ final CountDownLatch replicaStateLatch = new CountDownLatch(1);
+ final CountDownLatch masterStateLatch = new CountDownLatch(1);
+ StateChangeListener testStateChangeListener = new StateChangeListener()
+ {
+ @Override
+ public void stateChange(StateChangeEvent event) throws RuntimeException
+ {
+ ReplicatedEnvironment.State state = event.getState();
+ if (state == ReplicatedEnvironment.State.REPLICA)
+ {
+ replicaStateLatch.countDown();
+ }
+ if (state == ReplicatedEnvironment.State.MASTER)
+ {
+ masterStateLatch.countDown();
+ }
+ }
+ };
+ String thirdNodeName = TEST_NODE_NAME + "_2";
+ ReplicatedEnvironmentFacade thirdNode = addNode(thirdNodeName, node2NodeHostPort, TEST_DESIGNATED_PRIMARY, State.REPLICA, testStateChangeListener, new NoopReplicationGroupListener());
+ assertTrue("Environment did not become a replica", replicaStateLatch.await(10, TimeUnit.SECONDS));
+ assertEquals(3, thirdNode.getNumberOfElectableGroupMembers());
+
+ firstNode.transferMasterAsynchronously(thirdNodeName);
+ assertTrue("Environment did not become a master", masterStateLatch.await(10, TimeUnit.SECONDS));
+ assertTrue("First node environment did not become a replica", firstNodeReplicaStateLatch.await(10, TimeUnit.SECONDS));
+ assertEquals("Unexpected state", ReplicatedEnvironment.State.REPLICA.name(), firstNode.getNodeState());
+ }
+
public void testOpenDatabases() throws Exception
{
EnvironmentFacade ef = createMaster();
@@ -404,7 +520,6 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase
public void testEnvironmentRestartOnInsufficientReplicas() throws Exception
{
- long startTime = System.currentTimeMillis();
ReplicatedEnvironmentFacade master = createMaster();
@@ -417,9 +532,7 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase
String replica2NodeName = TEST_NODE_NAME + "_2";
String replica2NodeHostPort = "localhost:" + replica2Port;
ReplicatedEnvironmentFacade replica2 = addReplica(replica2NodeName, replica2NodeHostPort);
-
- long setUpTime = System.currentTimeMillis();
- LOGGER.debug("XXX Start Up Time " + (setUpTime - startTime));
+
String databaseName = "test";
DatabaseConfig dbConfig = createDatabase(master, databaseName);
@@ -428,8 +541,6 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase
replica1.close();
replica2.close();
- long closeTime = System.currentTimeMillis();
- LOGGER.debug("XXX Env close Time " + (closeTime - setUpTime));
Environment e = master.getEnvironment();
Database db = master.getOpenDatabase(databaseName);
try
@@ -441,22 +552,17 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase
{
master.handleDatabaseException(null, ex);
}
- long openDatabaseTime = System.currentTimeMillis();
- LOGGER.debug("XXX Open db Time " + (openDatabaseTime - closeTime ));
replica1 = addReplica(replica1NodeName, replica1NodeHostPort);
replica2 = addReplica(replica2NodeName, replica2NodeHostPort);
- long reopenTime = System.currentTimeMillis();
- LOGGER.debug("XXX Restart Time " + (reopenTime - openDatabaseTime ));
// Need to poll to await the remote node updating itself
long timeout = System.currentTimeMillis() + 5000;
while(!(State.REPLICA.name().equals(master.getNodeState()) || State.MASTER.name().equals(master.getNodeState()) ) && System.currentTimeMillis() < timeout)
{
Thread.sleep(200);
}
- long recoverTime = System.currentTimeMillis();
- LOGGER.debug("XXX Recover Time " + (recoverTime - reopenTime));
+
assertTrue("The node could not rejoin the cluster. State is " + master.getNodeState(),
State.REPLICA.name().equals(master.getNodeState()) || State.MASTER.name().equals(master.getNodeState()) );
diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java
index 84b8de7be9..c327c9f33a 100644
--- a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java
+++ b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java
@@ -143,6 +143,9 @@ public class HAClusterBlackboxTest extends QpidBrokerTestCase
assertEquals("Inactive broker has unexpeced role", "MASTER", attributes.get(ReplicationNode.ROLE));
assertProducingConsuming(connection);
+
+ String nodeName = _clusterCreator.getNodeNameForBrokerPort(activeBrokerPort);
+ _clusterCreator.awaitNodeToAttainRole(activeBrokerPort, nodeName, "REPLICA");
}
public void testTransferMasterFromRemoteNode() throws Exception
@@ -171,7 +174,10 @@ public class HAClusterBlackboxTest extends QpidBrokerTestCase
assertEquals("Inactive broker has unexpeced role", "MASTER", attributes.get(ReplicationNode.ROLE));
assertProducingConsuming(connection);
+
+ _clusterCreator.awaitNodeToAttainRole(activeBrokerPort, nodeName, "REPLICA");
}
+
public void testQuorumOverride() throws Exception
{
final Connection connection = getConnection(_brokerFailoverUrl);