summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMukti Krishnan <muktikrishnan94@gmail.com>2021-03-09 21:46:52 +0530
committerMate Szalay-Beko <mszalay@cloudera.com>2022-05-17 10:03:09 +0200
commit161e50574baf6725dec00c547a5005f19588587d (patch)
treee94da8e4e5168a7d9ca5aedba2ff209b305072f6
parentf68019ab9c2e0ee362b890e1a7a0a449afda3a7c (diff)
downloadzookeeper-161e50574baf6725dec00c547a5005f19588587d.tar.gz
ZOOKEEPER-3642: Fix potential data inconsistency due to DIFF sync after partial SNAP sync.
Based on https://github.com/apache/zookeeper/pull/1224 ; fixed unit test build issue. Author: Fangmin Lyu <fangmin@apache.org> Author: Michael Han <hanm@apache.org> Reviewers: Enrico Olivelli <eolivelli@apache.org>, Originally developed by Fangmin Lyu <fangmin@apache.org> Closes #1515 from hanm/ZOOKEEPER-3642 (cherry picked from commit a53cfeb26e1e1b9b6b1e29fe7bd9f0277b8fff9a)
-rw-r--r--zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java3
-rw-r--r--zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java4
-rw-r--r--zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java147
3 files changed, 150 insertions, 4 deletions
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
index a12c1117d..130861354 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
@@ -612,6 +612,9 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
*/
public synchronized void shutdown(boolean fullyShutDown) {
if (!canShutdown()) {
+ if (fullyShutDown && zkDb != null) {
+ zkDb.clear();
+ }
LOG.debug("ZooKeeper server is not running, so not proceeding to shutdown!");
return;
}
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
index 74f9f4ff1..7b22c060e 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
@@ -687,7 +687,9 @@ public class Learner {
closeSocket();
// shutdown previous zookeeper
if (zk != null) {
- zk.shutdown();
+ // If we haven't finished SNAP sync, force fully shutdown
+ // to avoid potential inconsistency
+ zk.shutdown(self.getSyncMode().equals(QuorumPeer.SyncMode.SNAP));
}
}
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java
index ff42204db..a1eeaa3ac 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java
@@ -20,6 +20,7 @@ package org.apache.zookeeper.server.quorum;
import static org.apache.zookeeper.test.ClientBase.CONNECTION_TIMEOUT;
import static org.apache.zookeeper.test.ClientBase.createEmptyTestDir;
+import static org.junit.Assert.assertNotNull;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doCallRealMethod;
import static org.mockito.Mockito.mock;
@@ -1412,21 +1413,153 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase {
}
}
+ /**
+ * If learner failed to do SNAP sync with leader before it's writing
+ * the snapshot to disk, it's possible that it might have DIFF sync
+ * with new leader or itself being elected as a leader.
+ *
+ * This test is trying to guarantee there is no data inconsistency for
+ * this case.
+ */
+ @Test
+ public void testDiffSyncAfterSnap() throws Exception {
+ final int ENSEMBLE_SERVERS = 3;
+ MainThread[] mt = new MainThread[ENSEMBLE_SERVERS];
+ ZooKeeper[] zk = new ZooKeeper[ENSEMBLE_SERVERS];
+
+ try {
+ // 1. start a quorum
+ final int[] clientPorts = new int[ENSEMBLE_SERVERS];
+ StringBuilder sb = new StringBuilder();
+ String server;
+
+ for (int i = 0; i < ENSEMBLE_SERVERS; i++) {
+ clientPorts[i] = PortAssignment.unique();
+ server = "server." + i + "=127.0.0.1:" + PortAssignment.unique()
+ + ":" + PortAssignment.unique()
+ + ":participant;127.0.0.1:" + clientPorts[i];
+ sb.append(server + "\n");
+ }
+ String currentQuorumCfgSection = sb.toString();
+
+ // start servers
+ Context[] contexts = new Context[ENSEMBLE_SERVERS];
+ for (int i = 0; i < ENSEMBLE_SERVERS; i++) {
+ final Context context = new Context();
+ contexts[i] = context;
+ mt[i] = new MainThread(i, clientPorts[i], currentQuorumCfgSection, false) {
+ @Override
+ public TestQPMain getTestQPMain() {
+ return new CustomizedQPMain(context);
+ }
+ };
+ mt[i].start();
+ zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i], ClientBase.CONNECTION_TIMEOUT, this);
+ }
+ waitForAll(zk, States.CONNECTED);
+ LOG.info("all servers started");
+
+ final String nodePath = "/testDiffSyncAfterSnap";
+
+ // 2. find leader and a follower
+ int leaderId = -1;
+ int followerA = -1;
+ for (int i = ENSEMBLE_SERVERS - 1; i >= 0; i--) {
+ if (mt[i].main.quorumPeer.leader != null) {
+ leaderId = i;
+ } else if (followerA == -1) {
+ followerA = i;
+ }
+ }
+
+ // 3. stop follower A
+ LOG.info("shutdown follower {}", followerA);
+ mt[followerA].shutdown();
+ waitForOne(zk[followerA], States.CONNECTING);
+
+ // 4. issue some traffic
+ int index = 0;
+ int numOfRequests = 10;
+ for (int i = 0; i < numOfRequests; i++) {
+ zk[leaderId].create(nodePath + index++,
+ new byte[1], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ }
+
+ CustomQuorumPeer leaderQuorumPeer = (CustomQuorumPeer) mt[leaderId].main.quorumPeer;
+
+ // 5. inject fault to cause the follower exit when received NEWLEADER
+ contexts[followerA].newLeaderReceivedCallback = new NewLeaderReceivedCallback() {
+ boolean processed = false;
+ @Override
+ public void process() throws IOException {
+ if (processed) {
+ return;
+ }
+ processed = true;
+ System.setProperty(LearnerHandler.FORCE_SNAP_SYNC, "false");
+ throw new IOException("read timedout");
+ }
+ };
+
+ // 6. force snap sync once
+ LOG.info("force snapshot sync");
+ System.setProperty(LearnerHandler.FORCE_SNAP_SYNC, "true");
+
+ // 7. start follower A
+ mt[followerA].start();
+ waitForOne(zk[followerA], States.CONNECTED);
+ LOG.info("verify the nodes are exist in memory");
+ for (int i = 0; i < index; i++) {
+ assertNotNull(zk[followerA].exists(nodePath + i, false));
+ }
+
+ // 8. issue another request which will be persisted on disk
+ zk[leaderId].create(nodePath + index++,
+ new byte[1], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+
+ // wait some time to let this get written to disk
+ Thread.sleep(500);
+
+ // 9. reload data from disk and make sure it's still consistent
+ LOG.info("restarting follower {}", followerA);
+ mt[followerA].shutdown();
+ waitForOne(zk[followerA], States.CONNECTING);
+ mt[followerA].start();
+ waitForOne(zk[followerA], States.CONNECTED);
+
+ for (int i = 0; i < index; i++) {
+ assertNotNull( "node " + i + " should exist", zk[followerA].exists(nodePath + i, false));
+ }
+
+ } finally {
+ System.clearProperty(LearnerHandler.FORCE_SNAP_SYNC);
+ for (int i = 0; i < ENSEMBLE_SERVERS; i++) {
+ mt[i].shutdown();
+ zk[i].close();
+ }
+ }
+ }
+
static class Context {
boolean quitFollowing = false;
boolean exitWhenAckNewLeader = false;
NewLeaderAckCallback newLeaderAckCallback = null;
+ NewLeaderReceivedCallback newLeaderReceivedCallback = null;
}
- static interface NewLeaderAckCallback {
+ interface NewLeaderAckCallback {
public void start();
}
- static interface StartForwardingListener {
+ interface NewLeaderReceivedCallback {
+ void process() throws IOException;
+ }
+
+ interface StartForwardingListener {
public void start();
}
- static interface BeginSnapshotListener {
+ interface BeginSnapshotListener {
public void start();
}
@@ -1481,6 +1614,14 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase {
}
super.writePacket(pp, flush);
}
+
+ @Override
+ void readPacket(QuorumPacket qp) throws IOException {
+ super.readPacket(qp);
+ if (qp.getType() == Leader.NEWLEADER && context.newLeaderReceivedCallback != null) {
+ context.newLeaderReceivedCallback.process();
+ }
+ }
};
}