diff options
Diffstat (limited to 'zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Commands.java')
-rw-r--r-- | zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Commands.java | 222 |
1 files changed, 158 insertions, 64 deletions
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Commands.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Commands.java index 848583a0f..1911b5a57 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Commands.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Commands.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.io.File; import java.io.FileInputStream; +import java.io.InputStream; import java.net.InetSocketAddress; import java.util.Arrays; import java.util.Collections; @@ -73,6 +74,8 @@ import org.slf4j.LoggerFactory; public class Commands { static final Logger LOG = LoggerFactory.getLogger(Commands.class); + static final String ADMIN_RATE_LIMITER_INTERVAL = "zookeeper.admin.rateLimiterIntervalInMS"; + private static final long rateLimiterInterval = Integer.parseInt(System.getProperty(ADMIN_RATE_LIMITER_INTERVAL, "300000")); /** Maps command names to Command instances */ private static Map<String, Command> commands = new HashMap<String, Command>(); @@ -100,16 +103,16 @@ public class Commands { * * @param cmdName * @param zkServer - * @param kwargs String-valued keyword arguments to the command + * @param kwargs String-valued keyword arguments to the command from HTTP GET request * (may be null if command requires no additional arguments) * @return Map representing response to command containing at minimum: * - "command" key containing the command's primary name * - "error" key containing a String error message or null if no error */ - public static CommandResponse runCommand( - String cmdName, - ZooKeeperServer zkServer, - Map<String, String> kwargs) { + public static CommandResponse runGetCommand( + String cmdName, + ZooKeeperServer zkServer, + Map<String, String> kwargs) { Command command = getCommand(cmdName); if (command == null) { // set the status code to 200 to keep the current behavior of existing commands @@ -119,7 +122,36 @@ public class Commands { // set the status code to 200 to keep the current behavior of existing commands return new CommandResponse(cmdName, "This ZooKeeper instance is not currently serving requests", HttpServletResponse.SC_OK); } - return command.run(zkServer, kwargs); + return command.runGet(zkServer, kwargs); + } + + /** + * Run the registered command with name cmdName. Commands should not produce + * any exceptions; any (anticipated) errors should be reported in the + * "error" entry of the returned map. Likewise, if no command with the given + * name is registered, this will be noted in the "error" entry. + * + * @param cmdName + * @param zkServer + * @param inputStream InputStream from HTTP POST request + * @return Map representing response to command containing at minimum: + * - "command" key containing the command's primary name + * - "error" key containing a String error message or null if no error + */ + public static CommandResponse runPostCommand( + String cmdName, + ZooKeeperServer zkServer, + InputStream inputStream) { + Command command = getCommand(cmdName); + if (command == null) { + // set the status code to 200 to keep the current behavior of existing commands + return new CommandResponse(cmdName, "Unknown command: " + cmdName, HttpServletResponse.SC_OK); + } + if (command.isServerRequired() && (zkServer == null || !zkServer.isRunning())) { + // set the status code to 200 to keep the current behavior of existing commands + return new CommandResponse(cmdName, "This ZooKeeper instance is not currently serving requests", HttpServletResponse.SC_OK); + } + return command.runPost(zkServer, inputStream); } /** @@ -152,6 +184,7 @@ public class Commands { registerCommand(new LeaderCommand()); registerCommand(new MonitorCommand()); registerCommand(new ObserverCnxnStatResetCommand()); + registerCommand(new RestoreCommand()); registerCommand(new RuokCommand()); registerCommand(new SetTraceMaskCommand()); registerCommand(new SnapshotCommand()); @@ -170,14 +203,14 @@ public class Commands { /** * Reset all connection statistics. */ - public static class CnxnStatResetCommand extends CommandBase { + public static class CnxnStatResetCommand extends GetCommand { public CnxnStatResetCommand() { super(Arrays.asList("connection_stat_reset", "crst")); } @Override - public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) { + public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) { CommandResponse response = initializeResponse(); zkServer.getServerCnxnFactory().resetAllConnectionStats(); return response; @@ -190,14 +223,14 @@ public class Commands { * Server configuration parameters. * @see ZooKeeperServer#getConf() */ - public static class ConfCommand extends CommandBase { + public static class ConfCommand extends GetCommand { public ConfCommand() { super(Arrays.asList("configuration", "conf", "config")); } @Override - public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) { + public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) { CommandResponse response = initializeResponse(); response.putAll(zkServer.getConf().toMap()); return response; @@ -210,14 +243,14 @@ public class Commands { * - "connections": list of connection info objects * @see org.apache.zookeeper.server.ServerCnxn#getConnectionInfo(boolean) */ - public static class ConsCommand extends CommandBase { + public static class ConsCommand extends GetCommand { public ConsCommand() { super(Arrays.asList("connections", "cons")); } @Override - public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) { + public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) { CommandResponse response = initializeResponse(); ServerCnxnFactory serverCnxnFactory = zkServer.getServerCnxnFactory(); if (serverCnxnFactory != null) { @@ -239,14 +272,14 @@ public class Commands { /** * Information on ZK datadir and snapdir size in bytes */ - public static class DirsCommand extends CommandBase { + public static class DirsCommand extends GetCommand { public DirsCommand() { super(Arrays.asList("dirs")); } @Override - public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) { + public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) { CommandResponse response = initializeResponse(); response.put("datadir_size", zkServer.getDataDirSize()); response.put("logdir_size", zkServer.getLogDirSize()); @@ -264,14 +297,14 @@ public class Commands { * @see ZooKeeperServer#getSessionExpiryMap() * @see ZooKeeperServer#getEphemerals() */ - public static class DumpCommand extends CommandBase { + public static class DumpCommand extends GetCommand { public DumpCommand() { super(Arrays.asList("dump")); } @Override - public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) { + public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) { CommandResponse response = initializeResponse(); response.put("expiry_time_to_session_ids", zkServer.getSessionExpiryMap()); response.put("session_id_to_ephemeral_paths", zkServer.getEphemerals()); @@ -283,14 +316,14 @@ public class Commands { /** * All defined environment variables. */ - public static class EnvCommand extends CommandBase { + public static class EnvCommand extends GetCommand { public EnvCommand() { super(Arrays.asList("environment", "env", "envi"), false); } @Override - public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) { + public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) { CommandResponse response = initializeResponse(); for (Entry e : Environment.list()) { response.put(e.getKey(), e.getValue()); @@ -303,14 +336,14 @@ public class Commands { /** * Digest histories for every specific number of txns. */ - public static class DigestCommand extends CommandBase { + public static class DigestCommand extends GetCommand { public DigestCommand() { super(Arrays.asList("hash")); } @Override - public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) { + public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) { CommandResponse response = initializeResponse(); response.put("digests", zkServer.getZKDatabase().getDataTree().getDigestLog()); return response; @@ -322,14 +355,14 @@ public class Commands { * The current trace mask. Returned map contains: * - "tracemask": Long */ - public static class GetTraceMaskCommand extends CommandBase { + public static class GetTraceMaskCommand extends GetCommand { public GetTraceMaskCommand() { super(Arrays.asList("get_trace_mask", "gtmk"), false); } @Override - public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) { + public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) { CommandResponse response = initializeResponse(); response.put("tracemask", ZooTrace.getTextTraceLevel()); return response; @@ -337,14 +370,14 @@ public class Commands { } - public static class InitialConfigurationCommand extends CommandBase { + public static class InitialConfigurationCommand extends GetCommand { public InitialConfigurationCommand() { super(Arrays.asList("initial_configuration", "icfg")); } @Override - public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) { + public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) { CommandResponse response = initializeResponse(); response.put("initial_configuration", zkServer.getInitialConfig()); return response; @@ -356,14 +389,14 @@ public class Commands { * Is this server in read-only mode. Returned map contains: * - "is_read_only": Boolean */ - public static class IsroCommand extends CommandBase { + public static class IsroCommand extends GetCommand { public IsroCommand() { super(Arrays.asList("is_read_only", "isro")); } @Override - public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) { + public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) { CommandResponse response = initializeResponse(); response.put("read_only", zkServer instanceof ReadOnlyZooKeeperServer); return response; @@ -380,14 +413,14 @@ public class Commands { * - "zxid": String * - "timestamp": Long */ - public static class LastSnapshotCommand extends CommandBase { + public static class LastSnapshotCommand extends GetCommand { public LastSnapshotCommand() { super(Arrays.asList("last_snapshot", "lsnp")); } @Override - public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) { + public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) { CommandResponse response = initializeResponse(); SnapshotInfo info = zkServer.getTxnLogFactory().getLastSnapshotInfo(); response.put("zxid", Long.toHexString(info == null ? -1L : info.zxid)); @@ -400,14 +433,14 @@ public class Commands { /** * Returns the leader status of this instance and the leader host string. */ - public static class LeaderCommand extends CommandBase { + public static class LeaderCommand extends GetCommand { public LeaderCommand() { super(Arrays.asList("leader", "lead")); } @Override - public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) { + public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) { CommandResponse response = initializeResponse(); if (zkServer instanceof QuorumZooKeeperServer) { response.put("is_leader", zkServer instanceof LeaderZooKeeperServer); @@ -450,14 +483,14 @@ public class Commands { * - "synced_followers": Integer (leader only) * - "pending_syncs": Integer (leader only) */ - public static class MonitorCommand extends CommandBase { + public static class MonitorCommand extends GetCommand { public MonitorCommand() { super(Arrays.asList("monitor", "mntr"), false); } @Override - public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) { + public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) { CommandResponse response = initializeResponse(); zkServer.dumpMonitorValues(response::put); ServerMetrics.getMetrics().getMetricsProvider().dump(response::put); @@ -470,14 +503,14 @@ public class Commands { /** * Reset all observer connection statistics. */ - public static class ObserverCnxnStatResetCommand extends CommandBase { + public static class ObserverCnxnStatResetCommand extends GetCommand { public ObserverCnxnStatResetCommand() { super(Arrays.asList("observer_connection_stat_reset", "orst")); } @Override - public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) { + public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) { CommandResponse response = initializeResponse(); if (zkServer instanceof LeaderZooKeeperServer) { Leader leader = ((LeaderZooKeeperServer) zkServer).getLeader(); @@ -492,16 +525,80 @@ public class Commands { } /** + * Restore from snapshot on the current server. + * + * Returned map contains: + * - "last_zxid": String + */ + public static class RestoreCommand extends PostCommand { + static final String RESPONSE_DATA_LAST_ZXID = "last_zxid"; + + static final String ADMIN_RESTORE_ENABLED = "zookeeper.admin.restore.enabled"; + + + private RateLimiter rateLimiter; + + public RestoreCommand() { + super(Arrays.asList("restore", "rest")); + rateLimiter = new RateLimiter(1, rateLimiterInterval, TimeUnit.MICROSECONDS); + } + + @Override + public CommandResponse runPost(final ZooKeeperServer zkServer, final InputStream inputStream) { + final CommandResponse response = initializeResponse(); + + // check feature flag + final boolean restoreEnabled = Boolean.parseBoolean(System.getProperty(ADMIN_RESTORE_ENABLED, "false")); + if (!restoreEnabled) { + response.setStatusCode(HttpServletResponse.SC_SERVICE_UNAVAILABLE); + LOG.warn("Restore command is disabled"); + return response; + } + + if (!zkServer.isSerializeLastProcessedZxidEnabled()) { + response.setStatusCode(HttpServletResponse.SC_INTERNAL_SERVER_ERROR); + LOG.warn("Restore command requires serializeLastProcessedZxidEnable flag is set to true"); + return response; + } + + if (inputStream == null){ + response.setStatusCode(HttpServletResponse.SC_BAD_REQUEST); + LOG.warn("InputStream from restore request is null"); + return response; + } + + // check rate limiting + if (!rateLimiter.allow()) { + response.setStatusCode(HttpStatus.TOO_MANY_REQUESTS_429); + ServerMetrics.getMetrics().RESTORE_RATE_LIMITED_COUNT.add(1); + LOG.warn("Restore request was rate limited"); + return response; + } + + // restore from snapshot InputStream + try { + final long lastZxid = zkServer.restoreFromSnapshot(inputStream); + response.put(RESPONSE_DATA_LAST_ZXID, lastZxid); + } catch (final Exception e) { + response.setStatusCode(HttpServletResponse.SC_INTERNAL_SERVER_ERROR); + ServerMetrics.getMetrics().RESTORE_ERROR_COUNT.add(1); + LOG.warn("Exception occurred when restore snapshot via the restore command", e); + } + return response; + } + } + + /** * No-op command, check if the server is running */ - public static class RuokCommand extends CommandBase { + public static class RuokCommand extends GetCommand { public RuokCommand() { super(Arrays.asList("ruok")); } @Override - public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) { + public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) { return initializeResponse(); } @@ -513,14 +610,14 @@ public class Commands { * Returned Map contains: * - "tracemask": Long */ - public static class SetTraceMaskCommand extends CommandBase { + public static class SetTraceMaskCommand extends GetCommand { public SetTraceMaskCommand() { super(Arrays.asList("set_trace_mask", "stmk"), false); } @Override - public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) { + public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) { CommandResponse response = initializeResponse(); long traceMask; if (!kwargs.containsKey("traceMask")) { @@ -551,28 +648,25 @@ public class Commands { * - "last_zxid": String * - "snapshot_size": String */ - public static class SnapshotCommand extends CommandBase { + public static class SnapshotCommand extends GetCommand { static final String REQUEST_QUERY_PARAM_STREAMING = "streaming"; static final String RESPONSE_HEADER_LAST_ZXID = "last_zxid"; static final String RESPONSE_HEADER_SNAPSHOT_SIZE = "snapshot_size"; static final String ADMIN_SNAPSHOT_ENABLED = "zookeeper.admin.snapshot.enabled"; - static final String ADMIN_SNAPSHOT_INTERVAL = "zookeeper.admin.snapshot.intervalInMS"; - - private static final long snapshotInterval = Integer.parseInt(System.getProperty(ADMIN_SNAPSHOT_INTERVAL, "300000")); private final RateLimiter rateLimiter; public SnapshotCommand() { super(Arrays.asList("snapshot", "snap")); - rateLimiter = new RateLimiter(1, snapshotInterval, TimeUnit.MICROSECONDS); + rateLimiter = new RateLimiter(1, rateLimiterInterval, TimeUnit.MICROSECONDS); } @SuppressFBWarnings(value = "OBL_UNSATISFIED_OBLIGATION", justification = "FileInputStream is passed to CommandResponse and closed in StreamOutputter") @Override - public CommandResponse run(final ZooKeeperServer zkServer, final Map<String, String> kwargs) { + public CommandResponse runGet(final ZooKeeperServer zkServer, final Map<String, String> kwargs) { final CommandResponse response = initializeResponse(); // check feature flag @@ -637,7 +731,7 @@ public class Commands { * - "server_stats": ServerStats object * - "node_count": Integer */ - public static class SrvrCommand extends CommandBase { + public static class SrvrCommand extends GetCommand { public SrvrCommand() { super(Arrays.asList("server_stats", "srvr")); @@ -649,7 +743,7 @@ public class Commands { } @Override - public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) { + public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) { CommandResponse response = initializeResponse(); LOG.info("running stat"); response.put("version", Version.getFullVersion()); @@ -676,8 +770,8 @@ public class Commands { } @Override - public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) { - CommandResponse response = super.run(zkServer, kwargs); + public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) { + CommandResponse response = super.runGet(zkServer, kwargs); final Iterable<Map<String, Object>> connections; if (zkServer.getServerCnxnFactory() != null) { @@ -702,14 +796,14 @@ public class Commands { /** * Resets server statistics. */ - public static class StatResetCommand extends CommandBase { + public static class StatResetCommand extends GetCommand { public StatResetCommand() { super(Arrays.asList("stat_reset", "srst")); } @Override - public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) { + public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) { CommandResponse response = initializeResponse(); zkServer.serverStats().reset(); return response; @@ -723,14 +817,14 @@ public class Commands { * - "observers": list of observer learner handler info objects (leader/follower only) * @see org.apache.zookeeper.server.quorum.LearnerHandler#getLearnerHandlerInfo() */ - public static class SyncedObserverConsCommand extends CommandBase { + public static class SyncedObserverConsCommand extends GetCommand { public SyncedObserverConsCommand() { super(Arrays.asList("observers", "obsr")); } @Override - public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) { + public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) { CommandResponse response = initializeResponse(); @@ -760,14 +854,14 @@ public class Commands { /** * All defined system properties. */ - public static class SystemPropertiesCommand extends CommandBase { + public static class SystemPropertiesCommand extends GetCommand { public SystemPropertiesCommand() { super(Arrays.asList("system_properties", "sysp"), false); } @Override - public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) { + public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) { CommandResponse response = initializeResponse(); Properties systemProperties = System.getProperties(); SortedMap<String, String> sortedSystemProperties = new TreeMap<>(); @@ -782,14 +876,14 @@ public class Commands { * Returns the current ensemble configuration information. * It provides list of current voting members in the ensemble. */ - public static class VotingViewCommand extends CommandBase { + public static class VotingViewCommand extends GetCommand { public VotingViewCommand() { super(Arrays.asList("voting_view")); } @Override - public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) { + public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) { CommandResponse response = initializeResponse(); if (zkServer instanceof QuorumZooKeeperServer) { QuorumPeer peer = ((QuorumZooKeeperServer) zkServer).self; @@ -850,14 +944,14 @@ public class Commands { * @see DataTree#getWatches() * @see DataTree#getWatches() */ - public static class WatchCommand extends CommandBase { + public static class WatchCommand extends GetCommand { public WatchCommand() { super(Arrays.asList("watches", "wchc")); } @Override - public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) { + public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) { DataTree dt = zkServer.getZKDatabase().getDataTree(); CommandResponse response = initializeResponse(); response.put("session_id_to_watched_paths", dt.getWatches().toMap()); @@ -871,14 +965,14 @@ public class Commands { * - "path_to_session_ids": Map<String, Set<Long>> path -> session IDs of sessions watching path * @see DataTree#getWatchesByPath() */ - public static class WatchesByPathCommand extends CommandBase { + public static class WatchesByPathCommand extends GetCommand { public WatchesByPathCommand() { super(Arrays.asList("watches_by_path", "wchp")); } @Override - public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) { + public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) { DataTree dt = zkServer.getZKDatabase().getDataTree(); CommandResponse response = initializeResponse(); response.put("path_to_session_ids", dt.getWatchesByPath().toMap()); @@ -891,14 +985,14 @@ public class Commands { * Summarized watch information. * @see DataTree#getWatchesSummary() */ - public static class WatchSummaryCommand extends CommandBase { + public static class WatchSummaryCommand extends GetCommand { public WatchSummaryCommand() { super(Arrays.asList("watch_summary", "wchs")); } @Override - public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) { + public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) { DataTree dt = zkServer.getZKDatabase().getDataTree(); CommandResponse response = initializeResponse(); response.putAll(dt.getWatchesSummary().toMap()); @@ -911,14 +1005,14 @@ public class Commands { * Returns the current phase of Zab protocol that peer is running. * It can be in one of these phases: ELECTION, DISCOVERY, SYNCHRONIZATION, BROADCAST */ - public static class ZabStateCommand extends CommandBase { + public static class ZabStateCommand extends GetCommand { public ZabStateCommand() { super(Arrays.asList("zabstate"), false); } @Override - public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) { + public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) { CommandResponse response = initializeResponse(); if (zkServer instanceof QuorumZooKeeperServer) { QuorumPeer peer = ((QuorumZooKeeperServer) zkServer).self; |