diff options
author | Ananya Singh <purpul90@gmail.com> | 2022-02-08 21:12:00 +0530 |
---|---|---|
committer | Mohammad Arshad <arshad@apache.org> | 2022-02-08 21:12:00 +0530 |
commit | 6f6757b2680af60bd8dd752d81480e71cb458def (patch) | |
tree | ace5006d5bbd6f91ae3002973a777402b3b0d0b9 | |
parent | fc1764bc976b3c57404686a4b4a69ec154c561c6 (diff) | |
download | zookeeper-6f6757b2680af60bd8dd752d81480e71cb458def.tar.gz |
ZOOKEEPER-4433 : Backport ZOOKEEPER-2872 for branch-3.5
Author: Ananya Singh <purpul90@gmail.com>
Reviewers: Brahma Reddy Battula <brahma@apache.org>, Norbert Kalmar <nkalmar@apache.org>, Mohammad Arshad <arshad@apache.org>
Closes #1790 from AnanyaSingh2121/ZOOKEEPER-4433
7 files changed, 34 insertions, 18 deletions
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java index 67737c586..2bab9378f 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java @@ -338,9 +338,13 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { takeSnapshot(); } - public void takeSnapshot(){ + public void takeSnapshot() { + takeSnapshot(false); + } + + public void takeSnapshot(boolean syncSnap){ try { - txnLogFactory.save(zkDb.getDataTree(), zkDb.getSessionWithTimeOuts()); + txnLogFactory.save(zkDb.getDataTree(), zkDb.getSessionWithTimeOuts(), syncSnap); } catch (IOException e) { LOG.error("Severe unrecoverable error, exiting", e); // This is a severe error that we cannot recover from, diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileSnap.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileSnap.java index 6bce62dae..9d9a3e2f2 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileSnap.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileSnap.java @@ -37,6 +37,7 @@ import org.apache.jute.BinaryInputArchive; import org.apache.jute.BinaryOutputArchive; import org.apache.jute.InputArchive; import org.apache.jute.OutputArchive; +import org.apache.zookeeper.common.AtomicFileOutputStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.zookeeper.server.DataTree; @@ -215,12 +216,15 @@ public class FileSnap implements SnapShot { * @param dt the datatree to be serialized * @param sessions the sessions to be serialized * @param snapShot the file to store snapshot into + * @param fsync sync the file immediately after write */ - public synchronized void serialize(DataTree dt, Map<Long, Integer> sessions, File snapShot) + public synchronized void serialize(DataTree dt, Map<Long, Integer> sessions, File snapShot, boolean fsync) throws IOException { if (!close) { - try (OutputStream sessOS = new BufferedOutputStream(new FileOutputStream(snapShot)); - CheckedOutputStream crcOut = new CheckedOutputStream(sessOS, new Adler32())) { + try (CheckedOutputStream crcOut = + new CheckedOutputStream(new BufferedOutputStream(fsync ? new AtomicFileOutputStream(snapShot) : + new FileOutputStream(snapShot)), + new Adler32())) { //CheckedOutputStream cout = new CheckedOutputStream() OutputArchive oa = BinaryOutputArchive.getArchive(crcOut); FileHeader header = new FileHeader(SNAP_MAGIC, VERSION, dbId); @@ -228,7 +232,7 @@ public class FileSnap implements SnapShot { long val = crcOut.getChecksum().getValue(); oa.writeLong(val, "val"); oa.writeString("/", "path"); - sessOS.flush(); + crcOut.flush(); } } else { throw new IOException("FileSnap has already been closed"); diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java index 850841342..4881e2844 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java @@ -245,7 +245,7 @@ public class FileTxnSnapLog { } /* TODO: (br33d) we should either put a ConcurrentHashMap on restore() * or use Map on save() */ - save(dt, (ConcurrentHashMap<Long, Integer>)sessions); + save(dt, (ConcurrentHashMap<Long, Integer>)sessions, false); /* return a zxid of zero, since we the database is empty */ return 0; } @@ -394,16 +394,18 @@ public class FileTxnSnapLog { * @param dataTree the datatree to be serialized onto disk * @param sessionsWithTimeouts the session timeouts to be * serialized onto disk + * @param syncSnap sync the snapshot immediately after write * @throws IOException */ public void save(DataTree dataTree, - ConcurrentHashMap<Long, Integer> sessionsWithTimeouts) + ConcurrentHashMap<Long, Integer> sessionsWithTimeouts, + boolean syncSnap) throws IOException { long lastZxid = dataTree.lastProcessedZxid; File snapshotFile = new File(snapDir, Util.makeSnapshotName(lastZxid)); LOG.info("Snapshotting: 0x{} to {}", Long.toHexString(lastZxid), snapshotFile); - snapLog.serialize(dataTree, sessionsWithTimeouts, snapshotFile); + snapLog.serialize(dataTree, sessionsWithTimeouts, snapshotFile, syncSnap); } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/SnapShot.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/SnapShot.java index c964afc6d..257c12d95 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/SnapShot.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/SnapShot.java @@ -44,11 +44,13 @@ public interface SnapShot { /** * persist the datatree and the sessions into a persistence storage * @param dt the datatree to be serialized - * @param sessions + * @param sessions the session timeouts to be serialized + * @param name the object name to store snapshot into + * @param fsync sync the snapshot immediately after write * @throws IOException */ - void serialize(DataTree dt, Map<Long, Integer> sessions, - File name) + void serialize(DataTree dt, Map<Long, Integer> sessions, + File name, boolean fsync) throws IOException; /** diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java index 3f282cac2..51b103882 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java @@ -383,6 +383,7 @@ public class Learner { // In the DIFF case we don't need to do a snapshot because the transactions will sync on top of any existing snapshot // For SNAP and TRUNC the snapshot is needed to save that history boolean snapshotNeeded = true; + boolean syncSnapshot = false; readPacket(qp); LinkedList<Long> packetsCommitted = new LinkedList<Long>(); LinkedList<PacketInFlight> packetsNotCommitted = new LinkedList<PacketInFlight>(); @@ -409,6 +410,9 @@ public class Learner { throw new IOException("Missing signature"); } zk.getZKDatabase().setlastProcessedZxid(qp.getZxid()); + + // immediately persist the latest snapshot when there is txn log gap + syncSnapshot = true; } else if (qp.getType() == Leader.TRUNC) { //we need to truncate the log to the lastzxid of the leader LOG.warn("Truncating log to get in sync with the leader 0x" @@ -535,7 +539,7 @@ public class Learner { } } if (isPreZAB1_0) { - zk.takeSnapshot(); + zk.takeSnapshot(syncSnapshot); self.setCurrentEpoch(newEpoch); } self.setZooKeeperServer(zk); @@ -555,7 +559,7 @@ public class Learner { } if (snapshotNeeded) { - zk.takeSnapshot(); + zk.takeSnapshot(syncSnapshot); } self.setCurrentEpoch(newEpoch); diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java index 754e40b13..4c24ba8a1 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java @@ -366,7 +366,7 @@ public class Zab1_0Test extends ZKTestCase { Assert.assertTrue(zxid > ZxidUtils.makeZxid(1, 0)); // Generate snapshot and close files. - snapLog.save(zkDb.getDataTree(), zkDb.getSessionWithTimeOuts()); + snapLog.save(zkDb.getDataTree(), zkDb.getSessionWithTimeOuts(), false); snapLog.close(); QuorumPeer peer = createQuorumPeer(tmpDir); @@ -649,7 +649,7 @@ public class Zab1_0Test extends ZKTestCase { Assert.assertEquals(1, f.self.getAcceptedEpoch()); Assert.assertEquals(1, f.self.getCurrentEpoch()); //Make sure that we did take the snapshot now - verify(f.zk).takeSnapshot(); + verify(f.zk).takeSnapshot(true); Assert.assertEquals(firstZxid, f.fzk.getLastProcessedZxid()); // Make sure the data was recorded in the filesystem ok @@ -1246,7 +1246,7 @@ public class Zab1_0Test extends ZKTestCase { FileTxnSnapLog logFactory = new FileTxnSnapLog(tmpDir, tmpDir); File version2 = new File(tmpDir, "version-2"); version2.mkdir(); - logFactory.save(new DataTree(), new ConcurrentHashMap<Long, Integer>()); + logFactory.save(new DataTree(), new ConcurrentHashMap<Long, Integer>(), false); long zxid = ZxidUtils.makeZxid(3, 3); logFactory.append(new Request(1, 1, ZooDefs.OpCode.error, new TxnHeader(1, 1, zxid, 1, ZooDefs.OpCode.error), diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/TruncateTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/TruncateTest.java index 6be1d3690..211a293d0 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/TruncateTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/TruncateTest.java @@ -77,7 +77,7 @@ public class TruncateTest extends ZKTestCase { ZKDatabase zkdb = new ZKDatabase(snaplog); // make sure to snapshot, so that we have something there when // truncateLog reloads the db - snaplog.save(zkdb.getDataTree(), zkdb.getSessionWithTimeOuts()); + snaplog.save(zkdb.getDataTree(), zkdb.getSessionWithTimeOuts(), false); for (int i = 1; i <= 100; i++) { append(zkdb, i); |