diff options
Diffstat (limited to 'zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java')
-rw-r--r-- | zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java | 82 |
1 files changed, 78 insertions, 4 deletions
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java index d61f269c9..bbb0f7fee 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java @@ -19,9 +19,11 @@ package org.apache.zookeeper.server; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import java.io.BufferedInputStream; import java.io.ByteArrayOutputStream; import java.io.File; import java.io.IOException; +import java.io.InputStream; import java.io.PrintWriter; import java.nio.ByteBuffer; import java.util.ArrayDeque; @@ -34,11 +36,16 @@ import java.util.Map; import java.util.Properties; import java.util.Random; import java.util.Set; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiConsumer; +import java.util.zip.Adler32; +import java.util.zip.CheckedInputStream; import javax.security.sasl.SaslException; +import org.apache.jute.BinaryInputArchive; import org.apache.jute.BinaryOutputArchive; +import org.apache.jute.InputArchive; import org.apache.jute.Record; import org.apache.zookeeper.Environment; import org.apache.zookeeper.KeeperException; @@ -127,6 +134,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { // this feature is confirmed to be stable public static final String CLOSE_SESSION_TXN_ENABLED = "zookeeper.closeSessionTxn.enabled"; private static boolean closeSessionTxnEnabled = true; + private volatile CountDownLatch restoreLatch; static { LOG = LoggerFactory.getLogger(ZooKeeperServer.class); @@ -541,12 +549,12 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { takeSnapshot(); } - public void takeSnapshot() throws IOException { - takeSnapshot(false); + public File takeSnapshot() throws IOException { + return takeSnapshot(false); } - public void takeSnapshot(boolean syncSnap) throws IOException { - takeSnapshot(syncSnap, true, false); + public File takeSnapshot(boolean syncSnap) throws IOException { + return takeSnapshot(syncSnap, true, false); } /** @@ -583,6 +591,61 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { return snapFile; } + /** + * Restores database from a snapshot. It is used by the restore admin server command. + * + * @param inputStream input stream of snapshot + * @Return last processed zxid + * @throws IOException + */ + public synchronized long restoreFromSnapshot(final InputStream inputStream) throws IOException { + if (inputStream == null) { + throw new IllegalArgumentException("InputStream can not be null when restoring from snapshot"); + } + + long start = Time.currentElapsedTime(); + LOG.info("Before restore database. lastProcessedZxid={}, nodeCount={},sessionCount={}", + getZKDatabase().getDataTreeLastProcessedZxid(), + getZKDatabase().dataTree.getNodeCount(), + getZKDatabase().getSessionCount()); + + // restore to a new zkDatabase + final ZKDatabase newZKDatabase = new ZKDatabase(this.txnLogFactory); + final CheckedInputStream cis = new CheckedInputStream(new BufferedInputStream(inputStream), new Adler32()); + final InputArchive ia = BinaryInputArchive.getArchive(cis); + newZKDatabase.deserializeSnapshot(ia, cis); + LOG.info("Restored to a new database. lastProcessedZxid={}, nodeCount={}, sessionCount={}", + newZKDatabase.getDataTreeLastProcessedZxid(), + newZKDatabase.dataTree.getNodeCount(), + newZKDatabase.getSessionCount()); + + // create a CountDownLatch + restoreLatch = new CountDownLatch(1); + + try { + // set to the new zkDatabase + setZKDatabase(newZKDatabase); + + // re-create SessionTrack + createSessionTracker(); + } finally { + // unblock request submission + restoreLatch.countDown(); + restoreLatch = null; + } + + LOG.info("After restore database. lastProcessedZxid={}, nodeCount={}, sessionCount={}", + getZKDatabase().getDataTreeLastProcessedZxid(), + getZKDatabase().dataTree.getNodeCount(), + getZKDatabase().getSessionCount()); + + long elapsed = Time.currentElapsedTime() - start; + LOG.info("Restore taken in {} ms", elapsed); + ServerMetrics.getMetrics().RESTORE_TIME.add(elapsed); + + return getLastProcessedZxid(); + } + public boolean shouldForceWriteInitialSnapshotAfterLeaderElection() { return txnLogFactory.shouldForceWriteInitialSnapshotAfterLeaderElection(); } @@ -826,6 +889,9 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { * <li>During shutdown the server sets the state to SHUTDOWN, which * corresponds to the server not running.</li></ul> * + * <li>During maintenance (e.g. restore) the server sets the state to MAINTENANCE + * </li></ul> + * * @param state new server state. */ protected void setState(State state) { @@ -1151,6 +1217,14 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { } public void submitRequest(Request si) { + if (restoreLatch != null) { + try { + LOG.info("Blocking request submission while restore is in progress"); + restoreLatch.await(); + } catch (final InterruptedException e) { + LOG.warn("Unexpected interruption", e); + } + } enqueueRequest(si); } |