summaryrefslogtreecommitdiff
path: root/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
diff options
context:
space:
mode:
Diffstat (limited to 'zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java')
-rw-r--r--zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java82
1 files changed, 78 insertions, 4 deletions
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
index d61f269c9..bbb0f7fee 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
@@ -19,9 +19,11 @@
package org.apache.zookeeper.server;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.io.BufferedInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
+import java.io.InputStream;
import java.io.PrintWriter;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
@@ -34,11 +36,16 @@ import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
+import java.util.zip.Adler32;
+import java.util.zip.CheckedInputStream;
import javax.security.sasl.SaslException;
+import org.apache.jute.BinaryInputArchive;
import org.apache.jute.BinaryOutputArchive;
+import org.apache.jute.InputArchive;
import org.apache.jute.Record;
import org.apache.zookeeper.Environment;
import org.apache.zookeeper.KeeperException;
@@ -127,6 +134,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
// this feature is confirmed to be stable
public static final String CLOSE_SESSION_TXN_ENABLED = "zookeeper.closeSessionTxn.enabled";
private static boolean closeSessionTxnEnabled = true;
+ private volatile CountDownLatch restoreLatch;
static {
LOG = LoggerFactory.getLogger(ZooKeeperServer.class);
@@ -541,12 +549,12 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
takeSnapshot();
}
- public void takeSnapshot() throws IOException {
- takeSnapshot(false);
+ public File takeSnapshot() throws IOException {
+ return takeSnapshot(false);
}
- public void takeSnapshot(boolean syncSnap) throws IOException {
- takeSnapshot(syncSnap, true, false);
+ public File takeSnapshot(boolean syncSnap) throws IOException {
+ return takeSnapshot(syncSnap, true, false);
}
/**
@@ -583,6 +591,61 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
return snapFile;
}
+ /**
+ * Restores database from a snapshot. It is used by the restore admin server command.
+ *
+ * @param inputStream input stream of snapshot
+ * @Return last processed zxid
+ * @throws IOException
+ */
+ public synchronized long restoreFromSnapshot(final InputStream inputStream) throws IOException {
+ if (inputStream == null) {
+ throw new IllegalArgumentException("InputStream can not be null when restoring from snapshot");
+ }
+
+ long start = Time.currentElapsedTime();
+ LOG.info("Before restore database. lastProcessedZxid={}, nodeCount={},sessionCount={}",
+ getZKDatabase().getDataTreeLastProcessedZxid(),
+ getZKDatabase().dataTree.getNodeCount(),
+ getZKDatabase().getSessionCount());
+
+ // restore to a new zkDatabase
+ final ZKDatabase newZKDatabase = new ZKDatabase(this.txnLogFactory);
+ final CheckedInputStream cis = new CheckedInputStream(new BufferedInputStream(inputStream), new Adler32());
+ final InputArchive ia = BinaryInputArchive.getArchive(cis);
+ newZKDatabase.deserializeSnapshot(ia, cis);
+ LOG.info("Restored to a new database. lastProcessedZxid={}, nodeCount={}, sessionCount={}",
+ newZKDatabase.getDataTreeLastProcessedZxid(),
+ newZKDatabase.dataTree.getNodeCount(),
+ newZKDatabase.getSessionCount());
+
+ // create a CountDownLatch
+ restoreLatch = new CountDownLatch(1);
+
+ try {
+ // set to the new zkDatabase
+ setZKDatabase(newZKDatabase);
+
+ // re-create SessionTrack
+ createSessionTracker();
+ } finally {
+ // unblock request submission
+ restoreLatch.countDown();
+ restoreLatch = null;
+ }
+
+ LOG.info("After restore database. lastProcessedZxid={}, nodeCount={}, sessionCount={}",
+ getZKDatabase().getDataTreeLastProcessedZxid(),
+ getZKDatabase().dataTree.getNodeCount(),
+ getZKDatabase().getSessionCount());
+
+ long elapsed = Time.currentElapsedTime() - start;
+ LOG.info("Restore taken in {} ms", elapsed);
+ ServerMetrics.getMetrics().RESTORE_TIME.add(elapsed);
+
+ return getLastProcessedZxid();
+ }
+
public boolean shouldForceWriteInitialSnapshotAfterLeaderElection() {
return txnLogFactory.shouldForceWriteInitialSnapshotAfterLeaderElection();
}
@@ -826,6 +889,9 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
* <li>During shutdown the server sets the state to SHUTDOWN, which
* corresponds to the server not running.</li></ul>
*
+ * <li>During maintenance (e.g. restore) the server sets the state to MAINTENANCE
+ * </li></ul>
+ *
* @param state new server state.
*/
protected void setState(State state) {
@@ -1151,6 +1217,14 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
}
public void submitRequest(Request si) {
+ if (restoreLatch != null) {
+ try {
+ LOG.info("Blocking request submission while restore is in progress");
+ restoreLatch.await();
+ } catch (final InterruptedException e) {
+ LOG.warn("Unexpected interruption", e);
+ }
+ }
enqueueRequest(si);
}