From 7744edf70c7c320d193115b278bd16a4c5167702 Mon Sep 17 00:00:00 2001 From: Alex Rudyy Date: Wed, 19 Feb 2014 16:30:23 +0000 Subject: QPID-5409: Restart the former master environment on MASTER transfer git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-bdb-ha@1569814 13f79535-47bb-0310-9956-ffa450edef68 --- .../replication/ReplicatedEnvironmentFacade.java | 83 ++++++++----- .../ReplicatedEnvironmentFacadeTest.java | 134 ++++++++++++++++++--- .../store/berkeleydb/HAClusterBlackboxTest.java | 6 + 3 files changed, 181 insertions(+), 42 deletions(-) diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java index 253c68bfb7..8e9f7f4b70 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java @@ -150,6 +150,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan private volatile ReplicatedEnvironment _environment; private volatile long _joinTime; + private volatile ReplicatedEnvironment.State _lastKnownEnvironmentState; public ReplicatedEnvironmentFacade(ReplicatedEnvironmentConfiguration configuration, RemoteReplicationNodeFactory remoteReplicationNodeFactory) { @@ -229,36 +230,41 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan boolean restart = (dbe instanceof InsufficientReplicasException || dbe instanceof InsufficientReplicasException || dbe instanceof RestartRequiredException); if (restart) { - if (_state.compareAndSet(State.OPEN, State.RESTARTING)) + tryToRestartEnvironment(dbe); + } + return new AMQStoreException(contextMessage, dbe); + } + + private void tryToRestartEnvironment(final DatabaseException dbe) + { + if (_state.compareAndSet(State.OPEN, State.RESTARTING)) + { + if (dbe != null && LOGGER.isDebugEnabled()) { - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug("Environment restarting due to exception " + dbe.getMessage(), dbe); - } + LOGGER.debug("Environment restarting due to exception " + dbe.getMessage(), dbe); + } - _environmentJobExecutor.execute(new Runnable() + _environmentJobExecutor.execute(new Runnable() + { + @Override + public void run() { - @Override - public void run() + try { - try - { - restartEnvironment(dbe); - } - catch (Exception e) - { - LOGGER.error("Exception on environment restart", e); - } + restartEnvironment(); } - }); + catch (Exception e) + { + LOGGER.error("Exception on environment restart", e); + } + } + }); - } - else - { - LOGGER.info("Cannot restart environment because of facade state: " + _state.get()); - } } - return new AMQStoreException(contextMessage, dbe); + else + { + LOGGER.info("Cannot restart environment because of facade state: " + _state.get()); + } } @Override @@ -379,6 +385,12 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan { listener.stateChange(stateChangeEvent); } + + if (_lastKnownEnvironmentState == ReplicatedEnvironment.State.MASTER && state == ReplicatedEnvironment.State.DETACHED && _state.get() == State.OPEN) + { + tryToRestartEnvironment(null); + } + _lastKnownEnvironmentState = state; } private void reopenDatabases() @@ -726,7 +738,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan } } - private void restartEnvironment(DatabaseException dbe) + private void restartEnvironment() { LOGGER.info("Restarting environment"); @@ -983,15 +995,30 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan @Override public void run() { - String groupName = _configuration.getGroupName(); - if (LOGGER.isDebugEnabled()) + if (_state.get() == State.OPEN) { - LOGGER.debug("Checking for changes in the group " + groupName); + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Checking for changes in the group " + _configuration.getGroupName() + " on node " + _configuration.getName()); + } + + try + { + detectGroupChangesAndNotify(); + } + catch(DatabaseException e) + { + handleDatabaseException("Exception on replication group check", e); + } } + } + private void detectGroupChangesAndNotify() + { + String groupName = _configuration.getGroupName(); ReplicatedEnvironment env = _environment; ReplicationGroupListener replicationGroupListener = _replicationGroupListener.get(); - if (env != null && env.isValid()) + if (env != null) { ReplicationGroup group = env.getGroup(); Set nodes = new HashSet(group.getElectableNodes()); diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java index c32791fd36..9b3f13921e 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java @@ -32,7 +32,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; -import org.apache.log4j.Logger; import org.apache.qpid.AMQStoreException; import org.apache.qpid.server.configuration.updater.TaskExecutor; import org.apache.qpid.server.model.ReplicationNode; @@ -48,6 +47,7 @@ import com.sleepycat.je.DatabaseConfig; import com.sleepycat.je.Durability; import com.sleepycat.je.Environment; import com.sleepycat.je.rep.InsufficientReplicasException; +import com.sleepycat.je.rep.ReplicatedEnvironment; import com.sleepycat.je.rep.ReplicatedEnvironment.State; import com.sleepycat.je.rep.ReplicationConfig; import com.sleepycat.je.rep.StateChangeEvent; @@ -55,7 +55,6 @@ import com.sleepycat.je.rep.StateChangeListener; public class ReplicatedEnvironmentFacadeTest extends QpidTestCase { - private static final Logger LOGGER = Logger.getLogger(ReplicatedEnvironmentFacadeTest.class); private static final int TEST_NODE_PORT = new QpidTestCase().findFreePort(); private static final int LISTENER_TIMEOUT = 5; @@ -132,6 +131,123 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase assertNull("Environment should be null after facade close", e); } + public void testTransferMasterToSelf() throws Exception + { + final CountDownLatch firstNodeReplicaStateLatch = new CountDownLatch(1); + final CountDownLatch firstNodeMasterStateLatch = new CountDownLatch(1); + StateChangeListener stateChangeListener = new StateChangeListener(){ + + @Override + public void stateChange(StateChangeEvent event) throws RuntimeException + { + ReplicatedEnvironment.State state = event.getState(); + if (state == ReplicatedEnvironment.State.REPLICA) + { + firstNodeReplicaStateLatch.countDown(); + } + if (state == ReplicatedEnvironment.State.MASTER) + { + firstNodeMasterStateLatch.countDown(); + } + } + }; + ReplicatedEnvironmentFacade firstNode = addNode(State.MASTER, stateChangeListener, new NoopReplicationGroupListener()); + assertTrue("Environment did not become a master", firstNodeMasterStateLatch.await(10, TimeUnit.SECONDS)); + + int replica1Port = getNextAvailable(TEST_NODE_PORT + 1); + String node1NodeHostPort = "localhost:" + replica1Port; + ReplicatedEnvironmentFacade secondNode = addReplica(TEST_NODE_NAME + "_1", node1NodeHostPort); + assertEquals("Unexpected state", ReplicatedEnvironment.State.REPLICA.name(), secondNode.getNodeState()); + + int replica2Port = getNextAvailable(replica1Port + 1); + String node2NodeHostPort = "localhost:" + replica2Port; + final CountDownLatch replicaStateLatch = new CountDownLatch(1); + final CountDownLatch masterStateLatch = new CountDownLatch(1); + StateChangeListener testStateChangeListener = new StateChangeListener() + { + @Override + public void stateChange(StateChangeEvent event) throws RuntimeException + { + ReplicatedEnvironment.State state = event.getState(); + if (state == ReplicatedEnvironment.State.REPLICA) + { + replicaStateLatch.countDown(); + } + if (state == ReplicatedEnvironment.State.MASTER) + { + masterStateLatch.countDown(); + } + } + }; + ReplicatedEnvironmentFacade thirdNode = addNode(TEST_NODE_NAME + "_2", node2NodeHostPort, TEST_DESIGNATED_PRIMARY, State.REPLICA, testStateChangeListener, new NoopReplicationGroupListener()); + assertTrue("Environment did not become a replica", replicaStateLatch.await(10, TimeUnit.SECONDS)); + assertEquals(3, thirdNode.getNumberOfElectableGroupMembers()); + + thirdNode.transferMasterToSelfAsynchronously(); + assertTrue("Environment did not become a master", masterStateLatch.await(10, TimeUnit.SECONDS)); + assertTrue("First node environment did not become a replica", firstNodeReplicaStateLatch.await(10, TimeUnit.SECONDS)); + assertEquals("Unexpected state", ReplicatedEnvironment.State.REPLICA.name(), firstNode.getNodeState()); + } + + public void testTransferMasterAnotherNode() throws Exception + { + final CountDownLatch firstNodeReplicaStateLatch = new CountDownLatch(1); + final CountDownLatch firstNodeMasterStateLatch = new CountDownLatch(1); + StateChangeListener stateChangeListener = new StateChangeListener(){ + + @Override + public void stateChange(StateChangeEvent event) throws RuntimeException + { + ReplicatedEnvironment.State state = event.getState(); + if (state == ReplicatedEnvironment.State.REPLICA) + { + firstNodeReplicaStateLatch.countDown(); + } + if (state == ReplicatedEnvironment.State.MASTER) + { + firstNodeMasterStateLatch.countDown(); + } + } + }; + ReplicatedEnvironmentFacade firstNode = addNode(State.MASTER, stateChangeListener, new NoopReplicationGroupListener()); + assertTrue("Environment did not become a master", firstNodeMasterStateLatch.await(10, TimeUnit.SECONDS)); + + int replica1Port = getNextAvailable(TEST_NODE_PORT + 1); + String node1NodeHostPort = "localhost:" + replica1Port; + ReplicatedEnvironmentFacade secondNode = addReplica(TEST_NODE_NAME + "_1", node1NodeHostPort); + assertEquals("Unexpected state", ReplicatedEnvironment.State.REPLICA.name(), secondNode.getNodeState()); + + int replica2Port = getNextAvailable(replica1Port + 1); + String node2NodeHostPort = "localhost:" + replica2Port; + final CountDownLatch replicaStateLatch = new CountDownLatch(1); + final CountDownLatch masterStateLatch = new CountDownLatch(1); + StateChangeListener testStateChangeListener = new StateChangeListener() + { + @Override + public void stateChange(StateChangeEvent event) throws RuntimeException + { + ReplicatedEnvironment.State state = event.getState(); + if (state == ReplicatedEnvironment.State.REPLICA) + { + replicaStateLatch.countDown(); + } + if (state == ReplicatedEnvironment.State.MASTER) + { + masterStateLatch.countDown(); + } + } + }; + String thirdNodeName = TEST_NODE_NAME + "_2"; + ReplicatedEnvironmentFacade thirdNode = addNode(thirdNodeName, node2NodeHostPort, TEST_DESIGNATED_PRIMARY, State.REPLICA, testStateChangeListener, new NoopReplicationGroupListener()); + assertTrue("Environment did not become a replica", replicaStateLatch.await(10, TimeUnit.SECONDS)); + assertEquals(3, thirdNode.getNumberOfElectableGroupMembers()); + + firstNode.transferMasterAsynchronously(thirdNodeName); + assertTrue("Environment did not become a master", masterStateLatch.await(10, TimeUnit.SECONDS)); + assertTrue("First node environment did not become a replica", firstNodeReplicaStateLatch.await(10, TimeUnit.SECONDS)); + assertEquals("Unexpected state", ReplicatedEnvironment.State.REPLICA.name(), firstNode.getNodeState()); + } + public void testOpenDatabases() throws Exception { EnvironmentFacade ef = createMaster(); @@ -404,7 +520,6 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase public void testEnvironmentRestartOnInsufficientReplicas() throws Exception { - long startTime = System.currentTimeMillis(); ReplicatedEnvironmentFacade master = createMaster(); @@ -417,9 +532,7 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase String replica2NodeName = TEST_NODE_NAME + "_2"; String replica2NodeHostPort = "localhost:" + replica2Port; ReplicatedEnvironmentFacade replica2 = addReplica(replica2NodeName, replica2NodeHostPort); - - long setUpTime = System.currentTimeMillis(); - LOGGER.debug("XXX Start Up Time " + (setUpTime - startTime)); + String databaseName = "test"; DatabaseConfig dbConfig = createDatabase(master, databaseName); @@ -428,8 +541,6 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase replica1.close(); replica2.close(); - long closeTime = System.currentTimeMillis(); - LOGGER.debug("XXX Env close Time " + (closeTime - setUpTime)); Environment e = master.getEnvironment(); Database db = master.getOpenDatabase(databaseName); try @@ -441,22 +552,17 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase { master.handleDatabaseException(null, ex); } - long openDatabaseTime = System.currentTimeMillis(); - LOGGER.debug("XXX Open db Time " + (openDatabaseTime - closeTime )); replica1 = addReplica(replica1NodeName, replica1NodeHostPort); replica2 = addReplica(replica2NodeName, replica2NodeHostPort); - long reopenTime = System.currentTimeMillis(); - LOGGER.debug("XXX Restart Time " + (reopenTime - openDatabaseTime )); // Need to poll to await the remote node updating itself long timeout = System.currentTimeMillis() + 5000; while(!(State.REPLICA.name().equals(master.getNodeState()) || State.MASTER.name().equals(master.getNodeState()) ) && System.currentTimeMillis() < timeout) { Thread.sleep(200); } - long recoverTime = System.currentTimeMillis(); - LOGGER.debug("XXX Recover Time " + (recoverTime - reopenTime)); + assertTrue("The node could not rejoin the cluster. State is " + master.getNodeState(), State.REPLICA.name().equals(master.getNodeState()) || State.MASTER.name().equals(master.getNodeState()) ); diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java index 84b8de7be9..c327c9f33a 100644 --- a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java +++ b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java @@ -143,6 +143,9 @@ public class HAClusterBlackboxTest extends QpidBrokerTestCase assertEquals("Inactive broker has unexpeced role", "MASTER", attributes.get(ReplicationNode.ROLE)); assertProducingConsuming(connection); + + String nodeName = _clusterCreator.getNodeNameForBrokerPort(activeBrokerPort); + _clusterCreator.awaitNodeToAttainRole(activeBrokerPort, nodeName, "REPLICA"); } public void testTransferMasterFromRemoteNode() throws Exception @@ -171,7 +174,10 @@ public class HAClusterBlackboxTest extends QpidBrokerTestCase assertEquals("Inactive broker has unexpeced role", "MASTER", attributes.get(ReplicationNode.ROLE)); assertProducingConsuming(connection); + + _clusterCreator.awaitNodeToAttainRole(activeBrokerPort, nodeName, "REPLICA"); } + public void testQuorumOverride() throws Exception { final Connection connection = getConnection(_brokerFailoverUrl); -- cgit v1.2.1