summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAnanya Singh <purpul90@gmail.com>2022-02-08 21:12:00 +0530
committerMohammad Arshad <arshad@apache.org>2022-02-08 21:12:00 +0530
commit6f6757b2680af60bd8dd752d81480e71cb458def (patch)
treeace5006d5bbd6f91ae3002973a777402b3b0d0b9
parentfc1764bc976b3c57404686a4b4a69ec154c561c6 (diff)
downloadzookeeper-6f6757b2680af60bd8dd752d81480e71cb458def.tar.gz
ZOOKEEPER-4433 : Backport ZOOKEEPER-2872 for branch-3.5
Author: Ananya Singh <purpul90@gmail.com> Reviewers: Brahma Reddy Battula <brahma@apache.org>, Norbert Kalmar <nkalmar@apache.org>, Mohammad Arshad <arshad@apache.org> Closes #1790 from AnanyaSingh2121/ZOOKEEPER-4433
-rw-r--r--zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java8
-rw-r--r--zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileSnap.java12
-rw-r--r--zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java8
-rw-r--r--zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/SnapShot.java8
-rw-r--r--zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java8
-rw-r--r--zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java6
-rw-r--r--zookeeper-server/src/test/java/org/apache/zookeeper/test/TruncateTest.java2
7 files changed, 34 insertions, 18 deletions
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
index 67737c586..2bab9378f 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
@@ -338,9 +338,13 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
takeSnapshot();
}
- public void takeSnapshot(){
+ public void takeSnapshot() {
+ takeSnapshot(false);
+ }
+
+ public void takeSnapshot(boolean syncSnap){
try {
- txnLogFactory.save(zkDb.getDataTree(), zkDb.getSessionWithTimeOuts());
+ txnLogFactory.save(zkDb.getDataTree(), zkDb.getSessionWithTimeOuts(), syncSnap);
} catch (IOException e) {
LOG.error("Severe unrecoverable error, exiting", e);
// This is a severe error that we cannot recover from,
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileSnap.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileSnap.java
index 6bce62dae..9d9a3e2f2 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileSnap.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileSnap.java
@@ -37,6 +37,7 @@ import org.apache.jute.BinaryInputArchive;
import org.apache.jute.BinaryOutputArchive;
import org.apache.jute.InputArchive;
import org.apache.jute.OutputArchive;
+import org.apache.zookeeper.common.AtomicFileOutputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.zookeeper.server.DataTree;
@@ -215,12 +216,15 @@ public class FileSnap implements SnapShot {
* @param dt the datatree to be serialized
* @param sessions the sessions to be serialized
* @param snapShot the file to store snapshot into
+ * @param fsync sync the file immediately after write
*/
- public synchronized void serialize(DataTree dt, Map<Long, Integer> sessions, File snapShot)
+ public synchronized void serialize(DataTree dt, Map<Long, Integer> sessions, File snapShot, boolean fsync)
throws IOException {
if (!close) {
- try (OutputStream sessOS = new BufferedOutputStream(new FileOutputStream(snapShot));
- CheckedOutputStream crcOut = new CheckedOutputStream(sessOS, new Adler32())) {
+ try (CheckedOutputStream crcOut =
+ new CheckedOutputStream(new BufferedOutputStream(fsync ? new AtomicFileOutputStream(snapShot) :
+ new FileOutputStream(snapShot)),
+ new Adler32())) {
//CheckedOutputStream cout = new CheckedOutputStream()
OutputArchive oa = BinaryOutputArchive.getArchive(crcOut);
FileHeader header = new FileHeader(SNAP_MAGIC, VERSION, dbId);
@@ -228,7 +232,7 @@ public class FileSnap implements SnapShot {
long val = crcOut.getChecksum().getValue();
oa.writeLong(val, "val");
oa.writeString("/", "path");
- sessOS.flush();
+ crcOut.flush();
}
} else {
throw new IOException("FileSnap has already been closed");
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java
index 850841342..4881e2844 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java
@@ -245,7 +245,7 @@ public class FileTxnSnapLog {
}
/* TODO: (br33d) we should either put a ConcurrentHashMap on restore()
* or use Map on save() */
- save(dt, (ConcurrentHashMap<Long, Integer>)sessions);
+ save(dt, (ConcurrentHashMap<Long, Integer>)sessions, false);
/* return a zxid of zero, since we the database is empty */
return 0;
}
@@ -394,16 +394,18 @@ public class FileTxnSnapLog {
* @param dataTree the datatree to be serialized onto disk
* @param sessionsWithTimeouts the session timeouts to be
* serialized onto disk
+ * @param syncSnap sync the snapshot immediately after write
* @throws IOException
*/
public void save(DataTree dataTree,
- ConcurrentHashMap<Long, Integer> sessionsWithTimeouts)
+ ConcurrentHashMap<Long, Integer> sessionsWithTimeouts,
+ boolean syncSnap)
throws IOException {
long lastZxid = dataTree.lastProcessedZxid;
File snapshotFile = new File(snapDir, Util.makeSnapshotName(lastZxid));
LOG.info("Snapshotting: 0x{} to {}", Long.toHexString(lastZxid),
snapshotFile);
- snapLog.serialize(dataTree, sessionsWithTimeouts, snapshotFile);
+ snapLog.serialize(dataTree, sessionsWithTimeouts, snapshotFile, syncSnap);
}
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/SnapShot.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/SnapShot.java
index c964afc6d..257c12d95 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/SnapShot.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/SnapShot.java
@@ -44,11 +44,13 @@ public interface SnapShot {
/**
* persist the datatree and the sessions into a persistence storage
* @param dt the datatree to be serialized
- * @param sessions
+ * @param sessions the session timeouts to be serialized
+ * @param name the object name to store snapshot into
+ * @param fsync sync the snapshot immediately after write
* @throws IOException
*/
- void serialize(DataTree dt, Map<Long, Integer> sessions,
- File name)
+ void serialize(DataTree dt, Map<Long, Integer> sessions,
+ File name, boolean fsync)
throws IOException;
/**
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
index 3f282cac2..51b103882 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
@@ -383,6 +383,7 @@ public class Learner {
// In the DIFF case we don't need to do a snapshot because the transactions will sync on top of any existing snapshot
// For SNAP and TRUNC the snapshot is needed to save that history
boolean snapshotNeeded = true;
+ boolean syncSnapshot = false;
readPacket(qp);
LinkedList<Long> packetsCommitted = new LinkedList<Long>();
LinkedList<PacketInFlight> packetsNotCommitted = new LinkedList<PacketInFlight>();
@@ -409,6 +410,9 @@ public class Learner {
throw new IOException("Missing signature");
}
zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());
+
+ // immediately persist the latest snapshot when there is txn log gap
+ syncSnapshot = true;
} else if (qp.getType() == Leader.TRUNC) {
//we need to truncate the log to the lastzxid of the leader
LOG.warn("Truncating log to get in sync with the leader 0x"
@@ -535,7 +539,7 @@ public class Learner {
}
}
if (isPreZAB1_0) {
- zk.takeSnapshot();
+ zk.takeSnapshot(syncSnapshot);
self.setCurrentEpoch(newEpoch);
}
self.setZooKeeperServer(zk);
@@ -555,7 +559,7 @@ public class Learner {
}
if (snapshotNeeded) {
- zk.takeSnapshot();
+ zk.takeSnapshot(syncSnapshot);
}
self.setCurrentEpoch(newEpoch);
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java
index 754e40b13..4c24ba8a1 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java
@@ -366,7 +366,7 @@ public class Zab1_0Test extends ZKTestCase {
Assert.assertTrue(zxid > ZxidUtils.makeZxid(1, 0));
// Generate snapshot and close files.
- snapLog.save(zkDb.getDataTree(), zkDb.getSessionWithTimeOuts());
+ snapLog.save(zkDb.getDataTree(), zkDb.getSessionWithTimeOuts(), false);
snapLog.close();
QuorumPeer peer = createQuorumPeer(tmpDir);
@@ -649,7 +649,7 @@ public class Zab1_0Test extends ZKTestCase {
Assert.assertEquals(1, f.self.getAcceptedEpoch());
Assert.assertEquals(1, f.self.getCurrentEpoch());
//Make sure that we did take the snapshot now
- verify(f.zk).takeSnapshot();
+ verify(f.zk).takeSnapshot(true);
Assert.assertEquals(firstZxid, f.fzk.getLastProcessedZxid());
// Make sure the data was recorded in the filesystem ok
@@ -1246,7 +1246,7 @@ public class Zab1_0Test extends ZKTestCase {
FileTxnSnapLog logFactory = new FileTxnSnapLog(tmpDir, tmpDir);
File version2 = new File(tmpDir, "version-2");
version2.mkdir();
- logFactory.save(new DataTree(), new ConcurrentHashMap<Long, Integer>());
+ logFactory.save(new DataTree(), new ConcurrentHashMap<Long, Integer>(), false);
long zxid = ZxidUtils.makeZxid(3, 3);
logFactory.append(new Request(1, 1, ZooDefs.OpCode.error,
new TxnHeader(1, 1, zxid, 1, ZooDefs.OpCode.error),
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/TruncateTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/TruncateTest.java
index 6be1d3690..211a293d0 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/TruncateTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/TruncateTest.java
@@ -77,7 +77,7 @@ public class TruncateTest extends ZKTestCase {
ZKDatabase zkdb = new ZKDatabase(snaplog);
// make sure to snapshot, so that we have something there when
// truncateLog reloads the db
- snaplog.save(zkdb.getDataTree(), zkdb.getSessionWithTimeOuts());
+ snaplog.save(zkdb.getDataTree(), zkdb.getSessionWithTimeOuts(), false);
for (int i = 1; i <= 100; i++) {
append(zkdb, i);