From b069edeb2436a051ad2b18e03fd67ae8ec23875c Mon Sep 17 00:00:00 2001 From: li4wang <68786536+li4wang@users.noreply.github.com> Date: Wed, 4 Jan 2023 07:40:32 -0800 Subject: ZOOKEEPER-4570: Admin server API for taking snapshot and stream out data (#1943) Provides a snapshot command for taking snapshot and streaming out data Author: Li Wang Co-authored-by: Li Wang Co-authored-by: Enrico Olivelli --- .../src/main/resources/markdown/zookeeperAdmin.md | 51 +++- .../java/org/apache/zookeeper/server/DataTree.java | 36 +++ .../org/apache/zookeeper/server/ServerMetrics.java | 12 + .../apache/zookeeper/server/ZooKeeperServer.java | 52 ++++- .../zookeeper/server/admin/CommandOutputter.java | 6 +- .../zookeeper/server/admin/CommandResponse.java | 78 ++++++- .../apache/zookeeper/server/admin/Commands.java | 102 +++++++- .../zookeeper/server/admin/JettyAdminServer.java | 26 ++- .../zookeeper/server/admin/JsonOutputter.java | 6 +- .../zookeeper/server/admin/StreamOutputter.java | 52 +++++ .../server/auth/IPAuthenticationProvider.java | 17 ++ .../zookeeper/server/persistence/FileSnap.java | 10 + .../server/persistence/FileTxnSnapLog.java | 4 +- .../apache/zookeeper/server/util/RateLimiter.java | 60 +++++ .../org/apache/zookeeper/TakeSnapshotTest.java | 103 +++++++++ .../test/java/org/apache/zookeeper/ZKTestCase.java | 4 + .../org/apache/zookeeper/server/DataTreeTest.java | 55 +++++ .../zookeeper/server/SnapshotDigestTest.java | 3 +- .../apache/zookeeper/server/TxnLogDigestTest.java | 3 + .../server/admin/CommandResponseTest.java | 7 +- .../zookeeper/server/admin/CommandsTest.java | 93 ++++++-- .../server/admin/JettyAdminServerTest.java | 6 +- .../server/admin/SnapshotCommandTest.java | 257 +++++++++++++++++++++ .../server/persistence/FileTxnSnapLogTest.java | 8 + .../server/quorum/FuzzySnapshotRelatedTest.java | 12 +- .../zookeeper/server/util/RateLimiterTest.java | 62 +++++ 26 files changed, 1073 insertions(+), 52 deletions(-) create mode 100644 zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/StreamOutputter.java create mode 100644 zookeeper-server/src/main/java/org/apache/zookeeper/server/util/RateLimiter.java create mode 100644 zookeeper-server/src/test/java/org/apache/zookeeper/TakeSnapshotTest.java create mode 100644 zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/SnapshotCommandTest.java create mode 100644 zookeeper-server/src/test/java/org/apache/zookeeper/server/util/RateLimiterTest.java diff --git a/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md b/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md index 8e19f0418..b4c01d35d 100644 --- a/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md +++ b/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md @@ -1208,7 +1208,32 @@ property, when available, is noted below. The default value is false. - +* *serializeLastProcessedZxid.enabled* + (Jave system property: **zookeeper.serializeLastProcessedZxid.enabled**) + **New in 3.9.0:** + If enabled, ZooKeeper serializes the lastProcessedZxid when snapshot and deserializes it + when restore. Defaults to true. Needs to be enabled for performing snapshot and restore + via admin server commands, as there is no snapshot file name to extract the lastProcessedZxid. + + This feature is backward and forward compatible. Here are the different scenarios. + + 1. Snapshot triggered by server internally + a. When loading old snapshot with new code, it will throw EOFException when trying to + read the non-exist lastProcessedZxid value, and the exception will be caught. + The lastProcessedZxid will be set using the snapshot file name. + + b. When loading new snapshot with old code, it will finish successfully after deserializing the + digest value, the lastProcessedZxid at the end of snapshot file will be ignored. + The lastProcessedZxid will be set using the snapshot file name. + + 2. Sync up between leader and follower + The lastProcessedZxid will not be serialized by leader and deserialized by follower + in both new and old code. It will be set to the lastProcessedZxid sent from leader + via QuorumPacket. + + 3. Snapshot triggered via admin server APIs + The feature flag need to be enabled for the snapshot command to work. + #### Cluster Options @@ -2087,6 +2112,20 @@ Both subsystems need to have sufficient amount of threads to achieve peak read t #### AdminServer configuration +**New in 3.9.0:** The following +options are used to configure the [AdminServer](#sc_adminserver). + +* *admin.snapshot.enabled* : + (Java system property: **zookeeper.admin.snapshot.enabled**) + The flag for enabling the snapshot command. Defaults to false. + It will be enabled by default once the auth support for admin server commands + is available. + +* *admin.snapshot.intervalInMS* : + (Java system property: **zookeeper.admin.snapshot.intervalInMS**) + The time interval for rate limiting snapshot command to protect the server. + Defaults to 5 mins. + **New in 3.7.1:** The following options are used to configure the [AdminServer](#sc_adminserver). @@ -2620,6 +2659,16 @@ Available commands include: Server information. Returns multiple fields giving a brief overview of server state. +* *snapshot/snap* : + Takes a snapshot of the current server in the datadir and stream out data. + Optional query parameter: + "streaming": Boolean (defaults to true if the parameter is not present) + Returns the following via Http headers: + "last_zxid": String + "snapshot_size": String + Note: this API is rate-limited (once every 5 mins by default) to protect the server + from being over-loaded. + * *stats/stat* : Same as *server_stats* but also returns the "connections" field (see *connections* for details). diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java index 2818e15aa..cd45a7c34 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java @@ -1744,6 +1744,42 @@ public class DataTree { } } + /** + * Serializes the lastProcessedZxid so we can get it from snapshot instead the snapshot file name. + * This is needed for performing snapshot and restore via admin server commands. + * + * @param oa the output stream to write to + * @return true if the lastProcessedZxid is serialized successfully, otherwise false + * @throws IOException if there is an I/O error + */ + public boolean serializeLastProcessedZxid(final OutputArchive oa) throws IOException { + if (!ZooKeeperServer.isSerializeLastProcessedZxidEnabled()) { + return false; + } + oa.writeLong(lastProcessedZxid, "lastZxid"); + return true; + } + + /** + * Deserializes the lastProcessedZxid from the input stream and updates the lastProcessedZxid field. + * + * @param ia the input stream to read from + * @return true if lastProcessedZxid is deserialized successfully, otherwise false + * @throws IOException if there is an I/O error + */ + public boolean deserializeLastProcessedZxid(final InputArchive ia) throws IOException { + if (!ZooKeeperServer.isSerializeLastProcessedZxidEnabled()) { + return false; + } + try { + lastProcessedZxid = ia.readLong("lastZxid"); + } catch (final EOFException e) { + LOG.warn("Got EOFException while reading the last processed zxid, likely due to reading an older snapshot."); + return false; + } + return true; + } + /** * Compares the actual tree's digest with that in the snapshot. * Resets digestFromLoadedSnapshot after comparision. diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java index af156542f..ef28e32e1 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java @@ -72,6 +72,8 @@ public final class ServerMetrics { FSYNC_TIME = metricsContext.getSummary("fsynctime", DetailLevel.BASIC); SNAPSHOT_TIME = metricsContext.getSummary("snapshottime", DetailLevel.BASIC); + SNAPSHOT_ERROR_COUNT = metricsContext.getCounter("snapshot_error_count"); + SNAPSHOT_RATE_LIMITED_COUNT = metricsContext.getCounter("snapshot_rate_limited_count"); DB_INIT_TIME = metricsContext.getSummary("dbinittime", DetailLevel.BASIC); READ_LATENCY = metricsContext.getSummary("readlatency", DetailLevel.ADVANCED); UPDATE_LATENCY = metricsContext.getSummary("updatelatency", DetailLevel.ADVANCED); @@ -276,6 +278,16 @@ public final class ServerMetrics { */ public final Summary SNAPSHOT_TIME; + /** + * Snapshot error count + */ + public final Counter SNAPSHOT_ERROR_COUNT; + + /** + * Snapshot rate limited count + */ + public final Counter SNAPSHOT_RATE_LIMITED_COUNT; + /** * Db init time (snapshot loading + txnlog replay) */ diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java index f6c2b93eb..d61f269c9 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java @@ -120,6 +120,9 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { public static final String ZOOKEEPER_DIGEST_ENABLED = "zookeeper.digest.enabled"; private static boolean digestEnabled; + public static final String ZOOKEEPER_SERIALIZE_LAST_PROCESSED_ZXID_ENABLED = "zookeeper.serializeLastProcessedZxid.enabled"; + private static boolean serializeLastProcessedZxidEnabled; + // Add a enable/disable option for now, we should remove this one when // this feature is confirmed to be stable public static final String CLOSE_SESSION_TXN_ENABLED = "zookeeper.closeSessionTxn.enabled"; @@ -153,6 +156,9 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { closeSessionTxnEnabled = Boolean.parseBoolean( System.getProperty(CLOSE_SESSION_TXN_ENABLED, "true")); LOG.info("{} = {}", CLOSE_SESSION_TXN_ENABLED, closeSessionTxnEnabled); + + setSerializeLastProcessedZxidEnabled(Boolean.parseBoolean( + System.getProperty(ZOOKEEPER_SERIALIZE_LAST_PROCESSED_ZXID_ENABLED, "true"))); } // @VisibleForTesting @@ -535,23 +541,46 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { takeSnapshot(); } - public void takeSnapshot() { + public void takeSnapshot() throws IOException { takeSnapshot(false); } - public void takeSnapshot(boolean syncSnap) { + public void takeSnapshot(boolean syncSnap) throws IOException { + takeSnapshot(syncSnap, true, false); + } + + /** + * Takes a snapshot on the server. + * + * @param syncSnap syncSnap sync the snapshot immediately after write + * @param isSevere if true system exist, otherwise throw IOException + * @param fastForwardFromEdits whether fast forward database to the latest recorded transactions + * + * @return file snapshot file object + * @throws IOException + */ + public synchronized File takeSnapshot(boolean syncSnap, boolean isSevere, boolean fastForwardFromEdits) throws IOException { long start = Time.currentElapsedTime(); + File snapFile = null; try { - txnLogFactory.save(zkDb.getDataTree(), zkDb.getSessionWithTimeOuts(), syncSnap); + if (fastForwardFromEdits) { + zkDb.fastForwardDataBase(); + } + snapFile = txnLogFactory.save(zkDb.getDataTree(), zkDb.getSessionWithTimeOuts(), syncSnap); } catch (IOException e) { - LOG.error("Severe unrecoverable error, exiting", e); - // This is a severe error that we cannot recover from, - // so we need to exit - ServiceUtils.requestSystemExit(ExitCode.TXNLOG_ERROR_TAKING_SNAPSHOT.getValue()); + if (isSevere) { + LOG.error("Severe unrecoverable error, exiting", e); + // This is a severe error that we cannot recover from, + // so we need to exit + ServiceUtils.requestSystemExit(ExitCode.TXNLOG_ERROR_TAKING_SNAPSHOT.getValue()); + } else { + throw e; + } } long elapsed = Time.currentElapsedTime() - start; LOG.info("Snapshot taken in {} ms", elapsed); ServerMetrics.getMetrics().SNAPSHOT_TIME.add(elapsed); + return snapFile; } public boolean shouldForceWriteInitialSnapshotAfterLeaderElection() { @@ -2139,6 +2168,15 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { ZooKeeperServer.digestEnabled = digestEnabled; } + public static boolean isSerializeLastProcessedZxidEnabled() { + return serializeLastProcessedZxidEnabled; + } + + public static void setSerializeLastProcessedZxidEnabled(boolean serializeLastZxidEnabled) { + serializeLastProcessedZxidEnabled = serializeLastZxidEnabled; + LOG.info("{} = {}", ZOOKEEPER_SERIALIZE_LAST_PROCESSED_ZXID_ENABLED, serializeLastZxidEnabled); + } + /** * Trim a path to get the immediate predecessor. * diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/CommandOutputter.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/CommandOutputter.java index a8fe8bdc0..b9a87830e 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/CommandOutputter.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/CommandOutputter.java @@ -18,6 +18,7 @@ package org.apache.zookeeper.server.admin; +import java.io.OutputStream; import java.io.PrintWriter; /** @@ -31,6 +32,9 @@ public interface CommandOutputter { /** The MIME type of this output (e.g., "application/json") */ String getContentType(); - void output(CommandResponse response, PrintWriter pw); + /** Print out data as output */ + default void output(CommandResponse response, PrintWriter pw) {} + /** Stream out data as output */ + default void output(final CommandResponse response, final OutputStream os) {} } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/CommandResponse.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/CommandResponse.java index d9e7239a1..f47b4e942 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/CommandResponse.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/CommandResponse.java @@ -17,8 +17,11 @@ package org.apache.zookeeper.server.admin; +import java.io.InputStream; +import java.util.HashMap; import java.util.LinkedHashMap; import java.util.Map; +import javax.servlet.http.HttpServletResponse; /** * A response from running a {@link Command}. @@ -37,6 +40,9 @@ public class CommandResponse { private final String command; private final String error; private final Map data; + private final Map headers; + private int statusCode; + private InputStream inputStream; /** * Creates a new response with no error string. @@ -44,18 +50,35 @@ public class CommandResponse { * @param command command name */ public CommandResponse(String command) { - this(command, null); + this(command, null, HttpServletResponse.SC_OK); } /** * Creates a new response. * * @param command command name * @param error error string (may be null) + * @param statusCode http status code */ - public CommandResponse(String command, String error) { + public CommandResponse(String command, String error, int statusCode) { + this(command, error, statusCode, null); + } + + + /** + * Creates a new response. + * + * @param command command name + * @param error error string (may be null) + * @param statusCode http status code + * @param inputStream inputStream to send out data (may be null) + */ + public CommandResponse(final String command, final String error, final int statusCode, final InputStream inputStream) { this.command = command; this.error = error; data = new LinkedHashMap(); + headers = new HashMap<>(); + this.statusCode = statusCode; + this.inputStream = inputStream; } /** @@ -76,6 +99,38 @@ public class CommandResponse { return error; } + /** + * Gets the http status code + * + * @return http status code + */ + public int getStatusCode() { + return statusCode; + } + + /** + * Sets the http status code + */ + public void setStatusCode(int statusCode) { + this.statusCode = statusCode; + } + + /** + * Gets the InputStream (may be null). + * + * @return InputStream + */ + public InputStream getInputStream() { + return inputStream; + } + + /** + * Sets the InputStream + */ + public void setInputStream(final InputStream inputStream) { + this.inputStream = inputStream; + } + /** * Adds a key/value pair to this response. * @@ -96,6 +151,25 @@ public class CommandResponse { data.putAll(m); } + /** + * Adds a header to this response. + * + * @param name name of the header + * @param value value of the header + */ + public void addHeader(final String name, final String value) { + headers.put(name, value); + } + + /** + * Returns all headers + * + * @return map representation of all headers + */ + public Map getHeaders() { + return headers; + } + /** * Converts this response to a map. The returned map is mutable, and * changes to it do not reflect back into this response. diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Commands.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Commands.java index 236c7ec24..848583a0f 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Commands.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Commands.java @@ -18,8 +18,11 @@ package org.apache.zookeeper.server.admin; +import static org.apache.zookeeper.server.persistence.FileSnap.SNAPSHOT_FILE_PREFIX; import com.fasterxml.jackson.annotation.JsonProperty; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import java.io.File; +import java.io.FileInputStream; import java.net.InetSocketAddress; import java.util.Arrays; import java.util.Collections; @@ -31,7 +34,9 @@ import java.util.Properties; import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import javax.servlet.http.HttpServletResponse; import org.apache.zookeeper.Environment; import org.apache.zookeeper.Environment.Entry; import org.apache.zookeeper.Version; @@ -41,6 +46,7 @@ import org.apache.zookeeper.server.ServerMetrics; import org.apache.zookeeper.server.ZooKeeperServer; import org.apache.zookeeper.server.ZooTrace; import org.apache.zookeeper.server.persistence.SnapshotInfo; +import org.apache.zookeeper.server.persistence.Util; import org.apache.zookeeper.server.quorum.Follower; import org.apache.zookeeper.server.quorum.FollowerZooKeeperServer; import org.apache.zookeeper.server.quorum.Leader; @@ -51,7 +57,9 @@ import org.apache.zookeeper.server.quorum.QuorumPeer.LearnerType; import org.apache.zookeeper.server.quorum.QuorumZooKeeperServer; import org.apache.zookeeper.server.quorum.ReadOnlyZooKeeperServer; import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier; +import org.apache.zookeeper.server.util.RateLimiter; import org.apache.zookeeper.server.util.ZxidUtils; +import org.eclipse.jetty.http.HttpStatus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -104,10 +112,12 @@ public class Commands { Map kwargs) { Command command = getCommand(cmdName); if (command == null) { - return new CommandResponse(cmdName, "Unknown command: " + cmdName); + // set the status code to 200 to keep the current behavior of existing commands + return new CommandResponse(cmdName, "Unknown command: " + cmdName, HttpServletResponse.SC_OK); } if (command.isServerRequired() && (zkServer == null || !zkServer.isRunning())) { - return new CommandResponse(cmdName, "This ZooKeeper instance is not currently serving requests"); + // set the status code to 200 to keep the current behavior of existing commands + return new CommandResponse(cmdName, "This ZooKeeper instance is not currently serving requests", HttpServletResponse.SC_OK); } return command.run(zkServer, kwargs); } @@ -144,6 +154,7 @@ public class Commands { registerCommand(new ObserverCnxnStatResetCommand()); registerCommand(new RuokCommand()); registerCommand(new SetTraceMaskCommand()); + registerCommand(new SnapshotCommand()); registerCommand(new SrvrCommand()); registerCommand(new StatCommand()); registerCommand(new StatResetCommand()); @@ -530,6 +541,93 @@ public class Commands { } + /** + * Take a snapshot of current server and stream out the data. + * + * Argument: + * - "streaming": optional String to indicate whether streaming out data + * + * Returned snapshot as stream if streaming is true and metadata of the snapshot + * - "last_zxid": String + * - "snapshot_size": String + */ + public static class SnapshotCommand extends CommandBase { + static final String REQUEST_QUERY_PARAM_STREAMING = "streaming"; + + static final String RESPONSE_HEADER_LAST_ZXID = "last_zxid"; + static final String RESPONSE_HEADER_SNAPSHOT_SIZE = "snapshot_size"; + + static final String ADMIN_SNAPSHOT_ENABLED = "zookeeper.admin.snapshot.enabled"; + static final String ADMIN_SNAPSHOT_INTERVAL = "zookeeper.admin.snapshot.intervalInMS"; + + private static final long snapshotInterval = Integer.parseInt(System.getProperty(ADMIN_SNAPSHOT_INTERVAL, "300000")); + + private final RateLimiter rateLimiter; + + public SnapshotCommand() { + super(Arrays.asList("snapshot", "snap")); + rateLimiter = new RateLimiter(1, snapshotInterval, TimeUnit.MICROSECONDS); + } + + @SuppressFBWarnings(value = "OBL_UNSATISFIED_OBLIGATION", + justification = "FileInputStream is passed to CommandResponse and closed in StreamOutputter") + @Override + public CommandResponse run(final ZooKeeperServer zkServer, final Map kwargs) { + final CommandResponse response = initializeResponse(); + + // check feature flag + final boolean snapshotEnabled = Boolean.parseBoolean(System.getProperty(ADMIN_SNAPSHOT_ENABLED, "false")); + if (!snapshotEnabled) { + response.setStatusCode(HttpServletResponse.SC_SERVICE_UNAVAILABLE); + LOG.warn("Snapshot command is disabled"); + return response; + } + + if (!zkServer.isSerializeLastProcessedZxidEnabled()) { + response.setStatusCode(HttpServletResponse.SC_INTERNAL_SERVER_ERROR); + LOG.warn("Snapshot command requires serializeLastProcessedZxidEnable flag is set to true"); + return response; + } + + // check rate limiting + if (!rateLimiter.allow()) { + response.setStatusCode(HttpStatus.TOO_MANY_REQUESTS_429); + ServerMetrics.getMetrics().SNAPSHOT_RATE_LIMITED_COUNT.add(1); + LOG.warn("Snapshot request was rate limited"); + return response; + } + + // check the streaming query param + boolean streaming = true; + if (kwargs.containsKey(REQUEST_QUERY_PARAM_STREAMING)) { + streaming = Boolean.parseBoolean(kwargs.get(REQUEST_QUERY_PARAM_STREAMING)); + } + + // take snapshot and stream out data if needed + try { + final File snapshotFile = zkServer.takeSnapshot(false, false, true); + final long lastZxid = Util.getZxidFromName(snapshotFile.getName(), SNAPSHOT_FILE_PREFIX); + response.addHeader(RESPONSE_HEADER_LAST_ZXID, "0x" + ZxidUtils.zxidToString(lastZxid)); + + final long size = snapshotFile.length(); + response.addHeader(RESPONSE_HEADER_SNAPSHOT_SIZE, String.valueOf(size)); + + if (size == 0) { + response.setStatusCode(HttpServletResponse.SC_INTERNAL_SERVER_ERROR); + ServerMetrics.getMetrics().SNAPSHOT_ERROR_COUNT.add(1); + LOG.warn("Snapshot file {} is empty", snapshotFile); + } else if (streaming) { + response.setInputStream(new FileInputStream(snapshotFile)); + } + } catch (final Exception e) { + response.setStatusCode(HttpServletResponse.SC_INTERNAL_SERVER_ERROR); + ServerMetrics.getMetrics().SNAPSHOT_ERROR_COUNT.add(1); + LOG.warn("Exception occurred when taking the snapshot via the snapshot admin command", e); + } + return response; + } + } + /** * Server information. Returned map contains: * - "version": String diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/JettyAdminServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/JettyAdminServer.java index 99241dcbd..3c82f8552 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/JettyAdminServer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/JettyAdminServer.java @@ -33,6 +33,7 @@ import org.apache.zookeeper.common.QuorumX509Util; import org.apache.zookeeper.common.SecretUtils; import org.apache.zookeeper.common.X509Util; import org.apache.zookeeper.server.ZooKeeperServer; +import org.apache.zookeeper.server.auth.IPAuthenticationProvider; import org.eclipse.jetty.http.HttpVersion; import org.eclipse.jetty.security.ConstraintMapping; import org.eclipse.jetty.security.ConstraintSecurityHandler; @@ -259,15 +260,26 @@ public class JettyAdminServer implements AdminServer { } // Run the command - CommandResponse cmdResponse = Commands.runCommand(cmd, zkServer, kwargs); + final CommandResponse cmdResponse = Commands.runCommand(cmd, zkServer, kwargs); + response.setStatus(cmdResponse.getStatusCode()); - // Format and print the output of the command - CommandOutputter outputter = new JsonOutputter(); - response.setStatus(HttpServletResponse.SC_OK); - response.setContentType(outputter.getContentType()); - outputter.output(cmdResponse, response.getWriter()); + final Map headers = cmdResponse.getHeaders(); + for (final Map.Entry header : headers.entrySet()) { + response.addHeader(header.getKey(), header.getValue()); + } + final String clientIP = IPAuthenticationProvider.getClientIPAddress(request); + if (cmdResponse.getInputStream() == null) { + // Format and print the output of the command + CommandOutputter outputter = new JsonOutputter(clientIP); + response.setContentType(outputter.getContentType()); + outputter.output(cmdResponse, response.getWriter()); + } else { + // Stream out the output of the command + CommandOutputter outputter = new StreamOutputter(clientIP); + response.setContentType(outputter.getContentType()); + outputter.output(cmdResponse, response.getOutputStream()); + } } - } /** diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/JsonOutputter.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/JsonOutputter.java index 7d9457453..44c88de9d 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/JsonOutputter.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/JsonOutputter.java @@ -35,12 +35,14 @@ public class JsonOutputter implements CommandOutputter { public static final String ERROR_RESPONSE = "{\"error\": \"Exception writing command response to JSON\"}"; private ObjectMapper mapper; + private final String clientIP; - public JsonOutputter() { + public JsonOutputter(final String clientIP) { mapper = new ObjectMapper(); mapper.configure(SerializationFeature.WRITE_ENUMS_USING_TO_STRING, true); mapper.configure(SerializationFeature.INDENT_OUTPUT, true); mapper.setPropertyNamingStrategy(PropertyNamingStrategies.SNAKE_CASE); + this.clientIP = clientIP; } @Override @@ -59,7 +61,7 @@ public class JsonOutputter implements CommandOutputter { LOG.warn("Exception writing command response to JSON:", e); pw.write(ERROR_RESPONSE); } catch (IOException e) { - LOG.warn("Exception writing command response to JSON:", e); + LOG.warn("Exception writing command response as JSON to {}", clientIP, e); pw.write(ERROR_RESPONSE); } } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/StreamOutputter.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/StreamOutputter.java new file mode 100644 index 000000000..e8f68e649 --- /dev/null +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/StreamOutputter.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.admin; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import org.apache.zookeeper.common.IOUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A class for streaming data out. + */ +public class StreamOutputter implements CommandOutputter{ + private static final Logger LOG = LoggerFactory.getLogger(StreamOutputter.class); + private final String clientIP; + + public StreamOutputter(final String clientIP) { + this.clientIP = clientIP; + } + + @Override + public String getContentType() { + return "application/octet-stream"; + } + + @Override + public void output(final CommandResponse response, final OutputStream os) { + try (final InputStream is = response.getInputStream()){ + IOUtils.copyBytes(is, os, 1024, true); + } catch (final IOException e) { + LOG.error("Exception occurred when streaming out data to {}", clientIP, e); + } + } +} diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/auth/IPAuthenticationProvider.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/auth/IPAuthenticationProvider.java index b93e55a32..9f6fb4005 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/auth/IPAuthenticationProvider.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/auth/IPAuthenticationProvider.java @@ -18,11 +18,14 @@ package org.apache.zookeeper.server.auth; +import java.util.StringTokenizer; +import javax.servlet.http.HttpServletRequest; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.data.Id; import org.apache.zookeeper.server.ServerCnxn; public class IPAuthenticationProvider implements AuthenticationProvider { + private static final String X_FORWARDED_FOR_HEADER_NAME = "X-Forwarded-For"; public String getScheme() { return "ip"; @@ -128,4 +131,18 @@ public class IPAuthenticationProvider implements AuthenticationProvider { return true; } + /** + * Returns the HTTP(s) client IP address + * @param request HttpServletRequest + * @return IP address + */ + public static String getClientIPAddress(final HttpServletRequest request) { + // to handle the case that a HTTP(s) client connects via a proxy or load balancer + final String xForwardedForHeader = request.getHeader(X_FORWARDED_FOR_HEADER_NAME); + if (xForwardedForHeader == null) { + return request.getRemoteAddr(); + } + // the format of the field is: X-Forwarded-For: client, proxy1, proxy2 ... + return new StringTokenizer(xForwardedForHeader, ",").nextToken().trim(); + } } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileSnap.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileSnap.java index df8a59c1a..1a91d1c30 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileSnap.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileSnap.java @@ -99,6 +99,11 @@ public class FileSnap implements SnapShot { SnapStream.checkSealIntegrity(snapIS, ia); } + // deserialize the last processed zxid and check the intact + if (dt.deserializeLastProcessedZxid(ia)) { + SnapStream.checkSealIntegrity(snapIS, ia); + } + foundValid = true; break; } catch (IOException e) { @@ -255,6 +260,11 @@ public class FileSnap implements SnapShot { SnapStream.sealStream(snapOS, oa); } + // serialize the last processed zxid and add another CRC check + if (dt.serializeLastProcessedZxid(oa)) { + SnapStream.sealStream(snapOS, oa); + } + lastSnapshotInfo = new SnapshotInfo( Util.getZxidFromName(snapShot.getName(), SNAPSHOT_FILE_PREFIX), snapShot.lastModified() / 1000); diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java index 403720bda..16f9cf71e 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java @@ -468,9 +468,10 @@ public class FileTxnSnapLog { * @param sessionsWithTimeouts the session timeouts to be * serialized onto disk * @param syncSnap sync the snapshot immediately after write + * @return the snapshot file * @throws IOException */ - public void save( + public File save( DataTree dataTree, ConcurrentHashMap sessionsWithTimeouts, boolean syncSnap) throws IOException { @@ -479,6 +480,7 @@ public class FileTxnSnapLog { LOG.info("Snapshotting: 0x{} to {}", Long.toHexString(lastZxid), snapshotFile); try { snapLog.serialize(dataTree, sessionsWithTimeouts, snapshotFile, syncSnap); + return snapshotFile; } catch (IOException e) { if (snapshotFile.length() == 0) { /* This may be caused by a full disk. In such a case, the server diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/util/RateLimiter.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/util/RateLimiter.java new file mode 100644 index 000000000..cb9473306 --- /dev/null +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/util/RateLimiter.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.util; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.zookeeper.common.Time; + +/** + * A class that provides simple interval-based rate limiting implementation. + */ +public class RateLimiter { + private final int rate; + private final long intervalInMs; + private long lastTimeReset; + private final AtomicInteger remained; + + public RateLimiter(final int rate, final long interval, final TimeUnit unit) { + this.rate = rate; + this.intervalInMs = unit.toMillis(interval); + this.lastTimeReset = Time.currentElapsedTime(); + this.remained = new AtomicInteger(rate); + } + + public boolean allow() { + final long now = Time.currentElapsedTime(); + + // reset the rate if interval passed + if (now > lastTimeReset + intervalInMs) { + remained.set(rate); + lastTimeReset = now; + } + + int value = remained.get(); + boolean allowed = false; + + // to handle race condition + while (!allowed && value > 0) { + allowed = remained.compareAndSet(value, value - 1); + value = remained.get(); + } + return allowed; + } +} diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/TakeSnapshotTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/TakeSnapshotTest.java new file mode 100644 index 000000000..a9953b7c0 --- /dev/null +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/TakeSnapshotTest.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import java.io.File; +import org.apache.zookeeper.ZooDefs.Ids; +import org.apache.zookeeper.server.ServerCnxnFactory; +import org.apache.zookeeper.server.ZooKeeperServer; +import org.apache.zookeeper.test.ClientBase; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +public class TakeSnapshotTest extends ClientBase { + private static final String BASE_PATH = "/takeSnapshotTest"; + private static final int NODE_COUNT = 100; + private static final String HOSTPORT = "127.0.0.1:" + PortAssignment.unique(); + private ZooKeeper zk; + + @TempDir + static File dataDir; + + @TempDir + static File logDir; + + + @BeforeEach + public void setUp() throws Exception { + super.setUp(); + ClientBase.setupTestEnv(); + } + + @AfterEach + public void tearDown() throws Exception { + if (zk != null) { + zk.close(); + } + } + + @Test + public void testTakeSnapshotAndRestore() throws Exception { + ZooKeeperServer zks = new ZooKeeperServer(dataDir, logDir, 3000); + ZooKeeperServer.setSerializeLastProcessedZxidEnabled(true); + + final int port = Integer.parseInt(HOSTPORT.split(":")[1]); + final ServerCnxnFactory serverCnxnFactory = ServerCnxnFactory.createFactory(port, -1); + serverCnxnFactory.startup(zks); + assertTrue(ClientBase.waitForServerUp(HOSTPORT, CONNECTION_TIMEOUT)); + + try { + zk = ClientBase.createZKClient(HOSTPORT); + for (int i = 0; i < NODE_COUNT; i++) { + final String path = BASE_PATH + "-" + i; + zk.create(path, String.valueOf(i).getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + } + + // takeSnapshot + zks.takeSnapshot(false, false, true); + + // clean up + zk.close(); + zks.shutdown(); + + // start server again and assert the data restored from snapshot + zks = new ZooKeeperServer(dataDir, logDir, 3000); + ZooKeeperServer.setSerializeLastProcessedZxidEnabled(false); + + serverCnxnFactory.startup(zks); + assertTrue(ClientBase.waitForServerUp(HOSTPORT, CONNECTION_TIMEOUT)); + + zk = ClientBase.createZKClient(HOSTPORT); + for (int i = 0; i < NODE_COUNT; i++) { + final String path = BASE_PATH + "-" + i; + final String expectedData = String.valueOf(i); + assertArrayEquals(expectedData.getBytes(), zk.getData(path, null, null)); + } + assertEquals(NODE_COUNT + 3, zk.getAllChildrenNumber("/")); + } finally { + zks.shutdown(); + serverCnxnFactory.shutdown(); + } + } +} diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/ZKTestCase.java b/zookeeper-server/src/test/java/org/apache/zookeeper/ZKTestCase.java index eb8fcbf93..826a875b4 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/ZKTestCase.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/ZKTestCase.java @@ -71,6 +71,10 @@ public class ZKTestCase { // accidentally attempting to start multiple admin servers on the // same port. System.setProperty("zookeeper.admin.enableServer", "false"); + + // disable rate limiting on the snapshot admin API + System.setProperty("zookeeper.admin.snapshot.intervalInMS", "0"); + // ZOOKEEPER-2693 disables all 4lw by default. // Here we enable the 4lw which ZooKeeper tests depends. System.setProperty("zookeeper.4lw.commands.whitelist", "*"); diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/DataTreeTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/DataTreeTest.java index 151e87343..07a69f14f 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/DataTreeTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/DataTreeTest.java @@ -516,6 +516,21 @@ public class DataTreeTest extends ZKTestCase { } } + @Test + public void testSerializeLastProcessedZxid_Enabled() throws Exception { + testSerializeLastProcessedZxid(true, true); + } + + @Test + public void testSerializeLastProcessedZxid_Disabled() throws Exception { + testSerializeLastProcessedZxid(false, false); + } + + @Test + public void testSerializeLastProcessedZxid_BackwardCompatibility() throws Exception { + testSerializeLastProcessedZxid(true, false); + } + @Test public void testDataTreeMetrics() throws Exception { ServerMetrics.getMetrics().resetAll(); @@ -616,4 +631,44 @@ public class DataTreeTest extends ZKTestCase { } } + private DataTree buildDataTreeForTest() { + final DataTree dt = new DataTree(); + assertEquals(dt.lastProcessedZxid, 0); + + dt.processTxn( + new TxnHeader(100, 1000, 1, 30, ZooDefs.OpCode.create), + new CreateTxn("/foo", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, false, 1), + null); + assertEquals(dt.lastProcessedZxid, 1); + return dt; + } + + private void testSerializeLastProcessedZxid(boolean enableForSerialize, boolean enableForDeserialize) throws Exception{ + final DataTree dt = buildDataTreeForTest(); + + try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) { + ZooKeeperServer.setSerializeLastProcessedZxidEnabled(enableForSerialize); + final BinaryOutputArchive oa = BinaryOutputArchive.getArchive(baos); + if (enableForSerialize) { + assertTrue(dt.serializeLastProcessedZxid(oa)); + } else { + assertFalse(dt.serializeLastProcessedZxid(oa)); + } + baos.flush(); + + ZooKeeperServer.setSerializeLastProcessedZxidEnabled(enableForDeserialize); + try (final ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray())) { + final InputArchive ia = BinaryInputArchive.getArchive(bais); + if (enableForDeserialize) { + assertTrue(dt.deserializeLastProcessedZxid(ia)); + } else { + assertFalse(dt.deserializeLastProcessedZxid(ia)); + } + assertEquals(dt.lastProcessedZxid, 1); + } + } finally { + ZooKeeperServer.setSerializeLastProcessedZxidEnabled(true); + } + } + } diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/SnapshotDigestTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/SnapshotDigestTest.java index 89dac35a3..4e8c6f8e2 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/SnapshotDigestTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/SnapshotDigestTest.java @@ -166,7 +166,7 @@ public class SnapshotDigestTest extends ClientBase { private void testCompatibleHelper(Boolean enabledBefore, Boolean enabledAfter) throws Exception { ZooKeeperServer.setDigestEnabled(enabledBefore); - + ZooKeeperServer.setSerializeLastProcessedZxidEnabled(enabledBefore); // restart the server to cache the option change reloadSnapshotAndCheckDigest(); @@ -179,6 +179,7 @@ public class SnapshotDigestTest extends ClientBase { server.takeSnapshot(); ZooKeeperServer.setDigestEnabled(enabledAfter); + ZooKeeperServer.setSerializeLastProcessedZxidEnabled(enabledAfter); reloadSnapshotAndCheckDigest(); diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/TxnLogDigestTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/TxnLogDigestTest.java index 6f6a29099..ae58fa563 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/TxnLogDigestTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/TxnLogDigestTest.java @@ -80,11 +80,13 @@ public class TxnLogDigestTest extends ClientBase { @Override public void setupCustomizedEnv() { ZooKeeperServer.setDigestEnabled(true); + ZooKeeperServer.setSerializeLastProcessedZxidEnabled(true); } @Override public void cleanUpCustomizedEnv() { ZooKeeperServer.setDigestEnabled(false); + ZooKeeperServer.setSerializeLastProcessedZxidEnabled(false); } @BeforeAll @@ -189,6 +191,7 @@ public class TxnLogDigestTest extends ClientBase { QuorumPeerMainTest.waitForOne(zk, States.CONNECTING); ZooKeeperServer.setDigestEnabled(digestEnabled); + ZooKeeperServer.setSerializeLastProcessedZxidEnabled(digestEnabled); startServer(); QuorumPeerMainTest.waitForOne(zk, States.CONNECTED); diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/CommandResponseTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/CommandResponseTest.java index cfa130690..8009dab47 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/CommandResponseTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/CommandResponseTest.java @@ -18,8 +18,10 @@ package org.apache.zookeeper.server.admin; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; import java.util.HashMap; import java.util.Map; +import javax.servlet.http.HttpServletResponse; import org.apache.zookeeper.ZKTestCase; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -30,13 +32,16 @@ public class CommandResponseTest extends ZKTestCase { @BeforeEach public void setUp() throws Exception { - r = new CommandResponse("makemeasandwich", "makeityourself"); + r = new CommandResponse("makemeasandwich", "makeityourself", HttpServletResponse.SC_OK); } @Test public void testGetters() { assertEquals("makemeasandwich", r.getCommand()); assertEquals("makeityourself", r.getError()); + assertEquals(HttpServletResponse.SC_OK, r.getStatusCode()); + assertEquals(new HashMap(), r.getHeaders()); + assertNull(r.getInputStream()); } @Test diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/CommandsTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/CommandsTest.java index ff8dc2033..2d74776e3 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/CommandsTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/CommandsTest.java @@ -18,18 +18,25 @@ package org.apache.zookeeper.server.admin; +import static org.apache.zookeeper.server.ZooKeeperServer.ZOOKEEPER_SERIALIZE_LAST_PROCESSED_ZXID_ENABLED; +import static org.apache.zookeeper.server.admin.Commands.SnapshotCommand.ADMIN_SNAPSHOT_ENABLED; +import static org.apache.zookeeper.server.admin.Commands.SnapshotCommand.ADMIN_SNAPSHOT_INTERVAL; +import static org.apache.zookeeper.server.admin.Commands.SnapshotCommand.REQUEST_QUERY_PARAM_STREAMING; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.core.Is.is; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import java.io.IOException; +import java.io.InputStream; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.Map; +import javax.servlet.http.HttpServletResponse; import org.apache.zookeeper.metrics.MetricsUtils; import org.apache.zookeeper.server.ServerCnxnFactory; import org.apache.zookeeper.server.ServerStats; @@ -50,37 +57,52 @@ public class CommandsTest extends ClientBase { * - the primary name of the command * @param kwargs * - keyword arguments to the command + * @param expectedHeaders + * - expected HTTP response headers + * @param expectedStatusCode + * - expected HTTP status code * @param fields * - the fields that are expected in the returned Map * @throws IOException * @throws InterruptedException */ - public void testCommand(String cmdName, Map kwargs, Field... fields) throws IOException, InterruptedException { + private void testCommand(String cmdName, Map kwargs, + Map expectedHeaders, int expectedStatusCode, + Field... fields) throws IOException, InterruptedException { ZooKeeperServer zks = serverFactory.getZooKeeperServer(); - Map result = Commands.runCommand(cmdName, zks, kwargs).toMap(); - - assertTrue(result.containsKey("command")); - // This is only true because we're setting cmdName to the primary name - assertEquals(cmdName, result.remove("command")); - assertTrue(result.containsKey("error")); - assertNull(result.remove("error"), "error: " + result.get("error")); - - for (Field field : fields) { - String k = field.key; - assertTrue(result.containsKey(k), - "Result from command " + cmdName + " missing field \"" + k + "\"" + "\n" + result); - Class t = field.type; - Object v = result.remove(k); - assertTrue(t.isAssignableFrom(v.getClass()), - "\"" + k + "\" field from command " + cmdName - + " should be of type " + t + ", is actually of type " + v.getClass()); + final CommandResponse commandResponse = Commands.runCommand(cmdName, zks, kwargs); + assertNotNull(commandResponse); + assertEquals(expectedStatusCode, commandResponse.getStatusCode()); + try (final InputStream responseStream = commandResponse.getInputStream()) { + if (Boolean.parseBoolean(kwargs.getOrDefault(REQUEST_QUERY_PARAM_STREAMING, "false"))) { + assertNotNull(responseStream, "InputStream in the response of command " + cmdName + " should not be null"); + } else { + Map result = commandResponse.toMap(); + assertTrue(result.containsKey("command")); + // This is only true because we're setting cmdName to the primary name + assertEquals(cmdName, result.remove("command")); + assertTrue(result.containsKey("error")); + assertNull(result.remove("error"), "error: " + result.get("error")); + + for (Field field : fields) { + String k = field.key; + assertTrue(result.containsKey(k), + "Result from command " + cmdName + " missing field \"" + k + "\"" + "\n" + result); + Class t = field.type; + Object v = result.remove(k); + assertTrue(t.isAssignableFrom(v.getClass()), + "\"" + k + "\" field from command " + cmdName + + " should be of type " + t + ", is actually of type " + v.getClass()); + } + + assertTrue(result.isEmpty(), "Result from command " + cmdName + " contains extra fields: " + result); + } } - - assertTrue(result.isEmpty(), "Result from command " + cmdName + " contains extra fields: " + result); + assertEquals(expectedHeaders, commandResponse.getHeaders()); } public void testCommand(String cmdName, Field... fields) throws IOException, InterruptedException { - testCommand(cmdName, new HashMap(), fields); + testCommand(cmdName, new HashMap(), new HashMap<>(), HttpServletResponse.SC_OK, fields); } private static class Field { @@ -91,7 +113,16 @@ public class CommandsTest extends ClientBase { this.key = key; this.type = type; } + } + @Test + public void testSnapshot_streaming() throws IOException, InterruptedException { + testSnapshot(true); + } + + @Test + public void testSnapshot_nonStreaming() throws IOException, InterruptedException { + testSnapshot(false); } @Test @@ -208,7 +239,7 @@ public class CommandsTest extends ClientBase { public void testSetTraceMask() throws IOException, InterruptedException { Map kwargs = new HashMap(); kwargs.put("traceMask", "1"); - testCommand("set_trace_mask", kwargs, new Field("tracemask", Long.class)); + testCommand("set_trace_mask", kwargs, new HashMap<>(), HttpServletResponse.SC_OK, new Field("tracemask", Long.class)); } @Test @@ -288,4 +319,22 @@ public class CommandsTest extends ClientBase { assertThat(response.toMap().containsKey("secure_connections"), is(true)); } + private void testSnapshot(final boolean streaming) throws IOException, InterruptedException { + System.setProperty(ADMIN_SNAPSHOT_ENABLED, "true"); + System.setProperty(ADMIN_SNAPSHOT_INTERVAL, "0"); + System.setProperty(ZOOKEEPER_SERIALIZE_LAST_PROCESSED_ZXID_ENABLED, "true"); + try { + final Map kwargs = new HashMap<>(); + kwargs.put(REQUEST_QUERY_PARAM_STREAMING, String.valueOf(streaming)); + final Map expectedHeaders = new HashMap<>(); + expectedHeaders.put(Commands.SnapshotCommand.RESPONSE_HEADER_LAST_ZXID, "0x0"); + expectedHeaders.put(Commands.SnapshotCommand.RESPONSE_HEADER_SNAPSHOT_SIZE, "478"); + testCommand("snapshot", kwargs, expectedHeaders, HttpServletResponse.SC_OK); + } finally { + System.clearProperty(ADMIN_SNAPSHOT_ENABLED); + System.clearProperty(ADMIN_SNAPSHOT_INTERVAL); + System.clearProperty(ZOOKEEPER_SERIALIZE_LAST_PROCESSED_ZXID_ENABLED); + } + } + } diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/JettyAdminServerTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/JettyAdminServerTest.java index c269e1994..b06cde6fd 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/JettyAdminServerTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/JettyAdminServerTest.java @@ -61,9 +61,9 @@ public class JettyAdminServerTest extends ZKTestCase { protected static final Logger LOG = LoggerFactory.getLogger(JettyAdminServerTest.class); - private static final String URL_FORMAT = "http://localhost:%d/commands"; - private static final String HTTPS_URL_FORMAT = "https://localhost:%d/commands"; - private static final int jettyAdminPort = PortAssignment.unique(); + static final String URL_FORMAT = "http://localhost:%d/commands"; + static final String HTTPS_URL_FORMAT = "https://localhost:%d/commands"; + static final int jettyAdminPort = PortAssignment.unique(); @BeforeEach public void enableServer() { diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/SnapshotCommandTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/SnapshotCommandTest.java new file mode 100644 index 000000000..5b5dbd979 --- /dev/null +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/SnapshotCommandTest.java @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.admin; + +import static org.apache.zookeeper.server.ZooKeeperServer.ZOOKEEPER_SERIALIZE_LAST_PROCESSED_ZXID_ENABLED; +import static org.apache.zookeeper.server.admin.Commands.SnapshotCommand.ADMIN_SNAPSHOT_ENABLED; +import static org.apache.zookeeper.server.admin.Commands.SnapshotCommand.ADMIN_SNAPSHOT_INTERVAL; +import static org.apache.zookeeper.server.admin.Commands.SnapshotCommand.REQUEST_QUERY_PARAM_STREAMING; +import static org.apache.zookeeper.server.admin.JettyAdminServerTest.URL_FORMAT; +import static org.apache.zookeeper.server.admin.JettyAdminServerTest.jettyAdminPort; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import java.io.BufferedReader; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.UnsupportedEncodingException; +import java.net.HttpURLConnection; +import java.net.URL; +import java.net.URLEncoder; +import java.nio.file.Files; +import java.util.HashMap; +import java.util.Map; +import javax.servlet.http.HttpServletResponse; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.PortAssignment; +import org.apache.zookeeper.ZKTestCase; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.common.IOUtils; +import org.apache.zookeeper.server.ServerCnxnFactory; +import org.apache.zookeeper.server.ZooKeeperServer; +import org.apache.zookeeper.test.ClientBase; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.io.TempDir; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +public class SnapshotCommandTest extends ZKTestCase { + private static final Logger LOG = LoggerFactory.getLogger(SnapshotCommandTest.class); + + private static final String PATH = "/snapshot_test"; + private static final int NODE_COUNT = 10; + + private final String hostPort = "127.0.0.1:" + PortAssignment.unique(); + private ServerCnxnFactory cnxnFactory; + private JettyAdminServer adminServer; + private ZooKeeperServer zks; + private ZooKeeper zk; + + @TempDir + static File dataDir; + + @TempDir + static File logDir; + + @BeforeAll + public void setup() throws Exception { + // start ZookeeperServer + System.setProperty("zookeeper.4lw.commands.whitelist", "*"); + zks = new ZooKeeperServer(dataDir, logDir, 3000); + final int port = Integer.parseInt(hostPort.split(":")[1]); + cnxnFactory = ServerCnxnFactory.createFactory(port, -1); + cnxnFactory.startup(zks); + assertTrue(ClientBase.waitForServerUp(hostPort, 120000)); + + // start AdminServer + System.setProperty("zookeeper.admin.enableServer", "true"); + System.setProperty("zookeeper.admin.serverPort", "" + jettyAdminPort); + System.setProperty(ADMIN_SNAPSHOT_ENABLED, "true"); + System.setProperty(ADMIN_SNAPSHOT_INTERVAL, "0"); + System.setProperty(ZOOKEEPER_SERIALIZE_LAST_PROCESSED_ZXID_ENABLED, "true"); + + adminServer = new JettyAdminServer(); + adminServer.setZooKeeperServer(zks); + adminServer.start(); + + // create Zookeeper client and test data + zk = ClientBase.createZKClient(hostPort); + createData(zk, NODE_COUNT); + } + + @AfterAll + public void tearDown() throws Exception { + System.clearProperty("zookeeper.4lw.commands.whitelist"); + System.clearProperty("zookeeper.admin.enableServer"); + System.clearProperty("zookeeper.admin.serverPort"); + System.clearProperty(ADMIN_SNAPSHOT_ENABLED); + System.clearProperty(ADMIN_SNAPSHOT_INTERVAL); + System.clearProperty(ZOOKEEPER_SERIALIZE_LAST_PROCESSED_ZXID_ENABLED); + + if (zk != null) { + zk.close(); + } + + if (adminServer != null) { + adminServer.shutdown(); + } + + if (cnxnFactory != null) { + cnxnFactory.shutdown(); + } + + if (zks != null) { + zks.shutdown(); + } + } + + @Test + public void testSnapshotCommand_streaming() throws Exception { + // take snapshot with streaming + final HttpURLConnection snapshotConn = sendSnapshotRequest(true); + + // validate snapshot response + assertEquals(HttpURLConnection.HTTP_OK, snapshotConn.getResponseCode()); + validateResponseHeaders(snapshotConn); + final File snapshotFile = new File(dataDir + "/snapshot." + System.currentTimeMillis()); + try (final InputStream inputStream = snapshotConn.getInputStream(); + final FileOutputStream outputStream = new FileOutputStream(snapshotFile)) { + IOUtils.copyBytes(inputStream, outputStream, 1024, true); + final long fileSize = Files.size(snapshotFile.toPath()); + assertTrue(fileSize > 0); + } + } + + @Test + public void testSnapshotCommand_nonStreaming() throws Exception { + // take snapshot without streaming + final HttpURLConnection snapshotConn = sendSnapshotRequest(false); + + // validate snapshot response + assertEquals(HttpURLConnection.HTTP_OK, snapshotConn.getResponseCode()); + validateResponseHeaders(snapshotConn); + displayResponsePayload(snapshotConn); + } + + @Test + public void testSnapshotCommand_disabled() throws Exception { + System.setProperty(ADMIN_SNAPSHOT_ENABLED, "false"); + try { + // take snapshot + final HttpURLConnection snapshotConn = sendSnapshotRequest(true); + + // validate snapshot response + assertEquals(HttpServletResponse.SC_SERVICE_UNAVAILABLE, snapshotConn.getResponseCode()); + } finally { + System.setProperty(ADMIN_SNAPSHOT_ENABLED, "true"); + } + } + + @Test + public void testSnapshotCommand_serializeLastZxidDisabled() throws Exception { + ZooKeeperServer.setSerializeLastProcessedZxidEnabled(false); + try { + // take snapshot + final HttpURLConnection snapshotConn = sendSnapshotRequest(true); + + // validate snapshot response + assertEquals(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, snapshotConn.getResponseCode()); + } finally { + ZooKeeperServer.setSerializeLastProcessedZxidEnabled(true); + } + } + + private void createData(final ZooKeeper zk, final long count) throws Exception { + try { + zk.create(PATH, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + } catch (final KeeperException.NodeExistsException ignore) { + // ignore + } + + for (int i = 0; i < count; i++) { + final String processNodePath = zk.create(String.format("%s/%s", PATH, "n_"), new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); + LOG.info("Node created. path={}" + processNodePath); + } + } + + private HttpURLConnection sendSnapshotRequest(final boolean streaming) throws Exception { + final String queryParamsStr = buildQueryStringForSnapshotCommand(streaming); + final URL snapshotURL = new URL(String.format(URL_FORMAT + "/snapshot", jettyAdminPort) + "?" + queryParamsStr); + final HttpURLConnection snapshotConn = (HttpURLConnection) snapshotURL.openConnection(); + snapshotConn.setRequestMethod("GET"); + + return snapshotConn; + } + + private String buildQueryStringForSnapshotCommand(final boolean streaming) throws Exception { + final Map parameters = new HashMap<>(); + parameters.put(REQUEST_QUERY_PARAM_STREAMING, String.valueOf(streaming)); + return getParamsString(parameters); + } + + private static String getParamsString(final Map params) throws UnsupportedEncodingException { + final StringBuilder result = new StringBuilder(); + + for (final Map.Entry entry : params.entrySet()) { + result.append(URLEncoder.encode(entry.getKey(), "UTF-8")); + result.append("="); + + result.append(URLEncoder.encode(entry.getValue(), "UTF-8")); + result.append("&"); + } + + final String resultString = result.toString(); + return resultString.length() > 0 + ? resultString.substring(0, resultString.length() - 1) + : resultString; + } + + private void validateResponseHeaders(final HttpURLConnection conn) { + LOG.info("Header:{}, Value:{}", + Commands.SnapshotCommand.RESPONSE_HEADER_LAST_ZXID, + conn.getHeaderField(Commands.SnapshotCommand.RESPONSE_HEADER_LAST_ZXID)); + assertNotNull(conn.getHeaderField(Commands.SnapshotCommand.RESPONSE_HEADER_LAST_ZXID)); + + LOG.info("Header:{}, Value:{}", + Commands.SnapshotCommand.RESPONSE_HEADER_SNAPSHOT_SIZE, + conn.getHeaderField(Commands.SnapshotCommand.RESPONSE_HEADER_SNAPSHOT_SIZE)); + assertNotNull(conn.getHeaderField(Commands.SnapshotCommand.RESPONSE_HEADER_SNAPSHOT_SIZE)); + assertTrue(Integer.parseInt(conn.getHeaderField(Commands.SnapshotCommand.RESPONSE_HEADER_SNAPSHOT_SIZE)) > 0); + } + + private void displayResponsePayload(final HttpURLConnection conn) throws IOException { + final StringBuilder sb = new StringBuilder(); + try (final BufferedReader in = new BufferedReader(new InputStreamReader(conn.getInputStream()))) { + String inputLine; + while ((inputLine = in.readLine()) != null) { + sb.append(inputLine); + } + LOG.info("Response payload: {}", sb); + } + } +} diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/persistence/FileTxnSnapLogTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/persistence/FileTxnSnapLogTest.java index 44a009934..466cf3f4b 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/persistence/FileTxnSnapLogTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/persistence/FileTxnSnapLogTest.java @@ -393,6 +393,10 @@ public class FileTxnSnapLogTest { SnapStream.setStreamMode(snappyEnabled ? SnapStream.StreamMode.SNAPPY : SnapStream.StreamMode.DEFAULT_MODE); ZooKeeperServer.setDigestEnabled(digestEnabled); + // set the flag to be the same as digestEnabled to make sure the last serialized data + // (for example, datatree, digest, lastProcessedZxid) is setup as expected for backward + // compatibility test. + ZooKeeperServer.setSerializeLastProcessedZxidEnabled(digestEnabled); TxnHeader txnHeader = new TxnHeader(1, 1, 1, 1 + 1, ZooDefs.OpCode.create); CreateTxn txn = new CreateTxn("/" + 1, "data".getBytes(), null, false, 1); Request request = new Request(1, 1, 1, txnHeader, txn, 1); @@ -401,6 +405,10 @@ public class FileTxnSnapLogTest { int expectedNodeCount = dataTree.getNodeCount(); ZooKeeperServer.setDigestEnabled(!digestEnabled); + // set the flag to be the same as digestEnabled to make sure the last serialized data + // (for example, datatree, digest, lastProcessedZxid) is setup as expected for backward + // compatibility test. + ZooKeeperServer.setSerializeLastProcessedZxidEnabled(!digestEnabled); snaplog.restore(dataTree, sessions, (hdr, rec, digest) -> { }); assertEquals(expectedNodeCount, dataTree.getNodeCount()); } diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/FuzzySnapshotRelatedTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/FuzzySnapshotRelatedTest.java index c39bc54e0..a75fb3830 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/FuzzySnapshotRelatedTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/FuzzySnapshotRelatedTest.java @@ -150,7 +150,11 @@ public class FuzzySnapshotRelatedTest extends QuorumPeerTestBase { @Override public void process(String path) { LOG.info("Take a snapshot"); - zkServer.takeSnapshot(true); + try { + zkServer.takeSnapshot(true); + } catch (final IOException e) { + // ignored as it should never reach here because of System.exit() call + } } }); @@ -373,7 +377,11 @@ public class FuzzySnapshotRelatedTest extends QuorumPeerTestBase { public void process(long sessionId) { LOG.info("Take snapshot"); if (shouldTakeSnapshot.getAndSet(false)) { - zkServer.takeSnapshot(true); + try { + zkServer.takeSnapshot(true); + } catch (IOException e) { + // ignored as it should never reach here because of System.exit() call + } } } }); diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/util/RateLimiterTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/util/RateLimiterTest.java new file mode 100644 index 000000000..b19e96823 --- /dev/null +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/util/RateLimiterTest.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.util; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import org.junit.jupiter.api.Test; + +public class RateLimiterTest { + + @Test + public void testAllow_withinInterval() { + final int rate = 2; + final RateLimiter rateLimiter = new RateLimiter(rate, 5, TimeUnit.SECONDS); + for (int i = 0; i < rate; i++) { + assertTrue(rateLimiter.allow()); + } + assertFalse(rateLimiter.allow()); + } + + @Test + public void testAllow_withinInterval_multiThreaded() { + final int rate = 10; + + final RateLimiter rateLimiter = new RateLimiter(rate, 5, TimeUnit.SECONDS); + final ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(rate + 1); + for (int i = 0; i < rate; i++) { + executor.execute(() -> assertTrue(rateLimiter.allow())); + } + executor.execute(() -> assertFalse(rateLimiter.allow())); + } + + @Test + public void testAllow_exceedInterval() throws Exception { + final int interval = 1; + + final RateLimiter rateLimiter = new RateLimiter(1, interval, TimeUnit.SECONDS); + assertTrue(rateLimiter.allow()); + assertFalse(rateLimiter.allow()); + Thread.sleep(TimeUnit.SECONDS.toMillis(interval + 1)); + assertTrue(rateLimiter.allow()); + } +} -- cgit v1.2.1