diff options
author | Mukti Krishnan <muktikrishnan94@gmail.com> | 2021-03-09 21:46:52 +0530 |
---|---|---|
committer | Mate Szalay-Beko <mszalay@cloudera.com> | 2022-05-17 10:03:09 +0200 |
commit | 161e50574baf6725dec00c547a5005f19588587d (patch) | |
tree | e94da8e4e5168a7d9ca5aedba2ff209b305072f6 | |
parent | f68019ab9c2e0ee362b890e1a7a0a449afda3a7c (diff) | |
download | zookeeper-161e50574baf6725dec00c547a5005f19588587d.tar.gz |
ZOOKEEPER-3642: Fix potential data inconsistency due to DIFF sync after partial SNAP sync.
Based on https://github.com/apache/zookeeper/pull/1224 ; fixed unit test build issue.
Author: Fangmin Lyu <fangmin@apache.org>
Author: Michael Han <hanm@apache.org>
Reviewers: Enrico Olivelli <eolivelli@apache.org>, Originally developed by Fangmin Lyu <fangmin@apache.org>
Closes #1515 from hanm/ZOOKEEPER-3642
(cherry picked from commit a53cfeb26e1e1b9b6b1e29fe7bd9f0277b8fff9a)
3 files changed, 150 insertions, 4 deletions
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java index a12c1117d..130861354 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java @@ -612,6 +612,9 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { */ public synchronized void shutdown(boolean fullyShutDown) { if (!canShutdown()) { + if (fullyShutDown && zkDb != null) { + zkDb.clear(); + } LOG.debug("ZooKeeper server is not running, so not proceeding to shutdown!"); return; } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java index 74f9f4ff1..7b22c060e 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java @@ -687,7 +687,9 @@ public class Learner { closeSocket(); // shutdown previous zookeeper if (zk != null) { - zk.shutdown(); + // If we haven't finished SNAP sync, force fully shutdown + // to avoid potential inconsistency + zk.shutdown(self.getSyncMode().equals(QuorumPeer.SyncMode.SNAP)); } } diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java index ff42204db..a1eeaa3ac 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java @@ -20,6 +20,7 @@ package org.apache.zookeeper.server.quorum; import static org.apache.zookeeper.test.ClientBase.CONNECTION_TIMEOUT; import static org.apache.zookeeper.test.ClientBase.createEmptyTestDir; +import static org.junit.Assert.assertNotNull; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doCallRealMethod; import static org.mockito.Mockito.mock; @@ -1412,21 +1413,153 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase { } } + /** + * If learner failed to do SNAP sync with leader before it's writing + * the snapshot to disk, it's possible that it might have DIFF sync + * with new leader or itself being elected as a leader. + * + * This test is trying to guarantee there is no data inconsistency for + * this case. + */ + @Test + public void testDiffSyncAfterSnap() throws Exception { + final int ENSEMBLE_SERVERS = 3; + MainThread[] mt = new MainThread[ENSEMBLE_SERVERS]; + ZooKeeper[] zk = new ZooKeeper[ENSEMBLE_SERVERS]; + + try { + // 1. start a quorum + final int[] clientPorts = new int[ENSEMBLE_SERVERS]; + StringBuilder sb = new StringBuilder(); + String server; + + for (int i = 0; i < ENSEMBLE_SERVERS; i++) { + clientPorts[i] = PortAssignment.unique(); + server = "server." + i + "=127.0.0.1:" + PortAssignment.unique() + + ":" + PortAssignment.unique() + + ":participant;127.0.0.1:" + clientPorts[i]; + sb.append(server + "\n"); + } + String currentQuorumCfgSection = sb.toString(); + + // start servers + Context[] contexts = new Context[ENSEMBLE_SERVERS]; + for (int i = 0; i < ENSEMBLE_SERVERS; i++) { + final Context context = new Context(); + contexts[i] = context; + mt[i] = new MainThread(i, clientPorts[i], currentQuorumCfgSection, false) { + @Override + public TestQPMain getTestQPMain() { + return new CustomizedQPMain(context); + } + }; + mt[i].start(); + zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i], ClientBase.CONNECTION_TIMEOUT, this); + } + waitForAll(zk, States.CONNECTED); + LOG.info("all servers started"); + + final String nodePath = "/testDiffSyncAfterSnap"; + + // 2. find leader and a follower + int leaderId = -1; + int followerA = -1; + for (int i = ENSEMBLE_SERVERS - 1; i >= 0; i--) { + if (mt[i].main.quorumPeer.leader != null) { + leaderId = i; + } else if (followerA == -1) { + followerA = i; + } + } + + // 3. stop follower A + LOG.info("shutdown follower {}", followerA); + mt[followerA].shutdown(); + waitForOne(zk[followerA], States.CONNECTING); + + // 4. issue some traffic + int index = 0; + int numOfRequests = 10; + for (int i = 0; i < numOfRequests; i++) { + zk[leaderId].create(nodePath + index++, + new byte[1], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + } + + CustomQuorumPeer leaderQuorumPeer = (CustomQuorumPeer) mt[leaderId].main.quorumPeer; + + // 5. inject fault to cause the follower exit when received NEWLEADER + contexts[followerA].newLeaderReceivedCallback = new NewLeaderReceivedCallback() { + boolean processed = false; + @Override + public void process() throws IOException { + if (processed) { + return; + } + processed = true; + System.setProperty(LearnerHandler.FORCE_SNAP_SYNC, "false"); + throw new IOException("read timedout"); + } + }; + + // 6. force snap sync once + LOG.info("force snapshot sync"); + System.setProperty(LearnerHandler.FORCE_SNAP_SYNC, "true"); + + // 7. start follower A + mt[followerA].start(); + waitForOne(zk[followerA], States.CONNECTED); + LOG.info("verify the nodes are exist in memory"); + for (int i = 0; i < index; i++) { + assertNotNull(zk[followerA].exists(nodePath + i, false)); + } + + // 8. issue another request which will be persisted on disk + zk[leaderId].create(nodePath + index++, + new byte[1], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + + // wait some time to let this get written to disk + Thread.sleep(500); + + // 9. reload data from disk and make sure it's still consistent + LOG.info("restarting follower {}", followerA); + mt[followerA].shutdown(); + waitForOne(zk[followerA], States.CONNECTING); + mt[followerA].start(); + waitForOne(zk[followerA], States.CONNECTED); + + for (int i = 0; i < index; i++) { + assertNotNull( "node " + i + " should exist", zk[followerA].exists(nodePath + i, false)); + } + + } finally { + System.clearProperty(LearnerHandler.FORCE_SNAP_SYNC); + for (int i = 0; i < ENSEMBLE_SERVERS; i++) { + mt[i].shutdown(); + zk[i].close(); + } + } + } + static class Context { boolean quitFollowing = false; boolean exitWhenAckNewLeader = false; NewLeaderAckCallback newLeaderAckCallback = null; + NewLeaderReceivedCallback newLeaderReceivedCallback = null; } - static interface NewLeaderAckCallback { + interface NewLeaderAckCallback { public void start(); } - static interface StartForwardingListener { + interface NewLeaderReceivedCallback { + void process() throws IOException; + } + + interface StartForwardingListener { public void start(); } - static interface BeginSnapshotListener { + interface BeginSnapshotListener { public void start(); } @@ -1481,6 +1614,14 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase { } super.writePacket(pp, flush); } + + @Override + void readPacket(QuorumPacket qp) throws IOException { + super.readPacket(qp); + if (qp.getType() == Leader.NEWLEADER && context.newLeaderReceivedCallback != null) { + context.newLeaderReceivedCallback.process(); + } + } }; } |