summaryrefslogtreecommitdiff
path: root/src/redis-cli.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/redis-cli.c')
-rw-r--r--src/redis-cli.c263
1 files changed, 212 insertions, 51 deletions
diff --git a/src/redis-cli.c b/src/redis-cli.c
index 964ae59ef..d8e6b966a 100644
--- a/src/redis-cli.c
+++ b/src/redis-cli.c
@@ -45,6 +45,7 @@
#include <fcntl.h>
#include <limits.h>
#include <math.h>
+#include <termios.h>
#include <hiredis.h>
#ifdef USE_OPENSSL
@@ -172,6 +173,9 @@ int spectrum_palette_mono[] = {0,233,234,235,237,239,241,243,245,247,249,251,253
int *spectrum_palette;
int spectrum_palette_size;
+static int orig_termios_saved = 0;
+static struct termios orig_termios; /* To restore terminal at exit.*/
+
/* Dict Helpers */
static uint64_t dictSdsHash(const void *key);
static int dictSdsKeyCompare(dict *d, const void *key1,
@@ -267,12 +271,14 @@ static struct config {
int eval_ldb_end; /* Lua debugging session ended. */
int enable_ldb_on_eval; /* Handle manual SCRIPT DEBUG + EVAL commands. */
int last_cmd_type;
+ redisReply *last_reply;
int verbose;
int set_errcode;
clusterManagerCommand cluster_manager_command;
int no_auth_warning;
- int resp2;
+ int resp2; /* value of 1: specified explicitly with option -2 */
int resp3; /* value of 1: specified explicitly, value of 2: implicit like --json option */
+ int current_resp3; /* 1 if we have RESP3 right now in the current connection. */
int in_multi;
int pre_multi_dbnum;
} config;
@@ -335,6 +341,9 @@ static void cliRefreshPrompt(void) {
if (config.in_multi)
prompt = sdscatlen(prompt,"(TX)",4);
+ if (config.pubsub_mode)
+ prompt = sdscatfmt(prompt,"(subscribed mode)");
+
/* Copy the prompt in the static buffer. */
prompt = sdscatlen(prompt,"> ",2);
snprintf(config.prompt,sizeof(config.prompt),"%s",prompt);
@@ -1017,6 +1026,29 @@ static void freeHintsCallback(void *ptr) {
}
/*------------------------------------------------------------------------------
+ * TTY manipulation
+ *--------------------------------------------------------------------------- */
+
+/* Restore terminal if we've changed it. */
+void cliRestoreTTY(void) {
+ if (orig_termios_saved)
+ tcsetattr(STDIN_FILENO, TCSANOW, &orig_termios);
+}
+
+/* Put the terminal in "press any key" mode */
+static void cliPressAnyKeyTTY(void) {
+ if (!isatty(STDIN_FILENO)) return;
+ if (!orig_termios_saved) {
+ if (tcgetattr(STDIN_FILENO, &orig_termios) == -1) return;
+ atexit(cliRestoreTTY);
+ orig_termios_saved = 1;
+ }
+ struct termios mode = orig_termios;
+ mode.c_lflag &= ~(ECHO | ICANON); /* echoing off, canonical off */
+ tcsetattr(STDIN_FILENO, TCSANOW, &mode);
+}
+
+/*------------------------------------------------------------------------------
* Networking / parsing
*--------------------------------------------------------------------------- */
@@ -1088,6 +1120,7 @@ static int cliSwitchProto(void) {
}
}
freeReplyObject(reply);
+ config.current_resp3 = 1;
return result;
}
@@ -1147,6 +1180,9 @@ static int cliConnect(int flags) {
* errors. */
anetKeepAlive(NULL, context->fd, REDIS_CLI_KEEPALIVE_INTERVAL);
+ /* State of the current connection. */
+ config.current_resp3 = 0;
+
/* Do AUTH, select the right DB, switch to RESP3 if needed. */
if (cliAuth(context, config.conn_info.user, config.conn_info.auth) != REDIS_OK)
return REDIS_ERR;
@@ -1309,6 +1345,8 @@ static sds cliFormatReplyTTY(redisReply *r, char *prefix) {
char numsep;
if (r->type == REDIS_REPLY_SET) numsep = '~';
else if (r->type == REDIS_REPLY_MAP) numsep = '#';
+ /* TODO: this would be a breaking change for scripts, do that in a major version. */
+ /* else if (r->type == REDIS_REPLY_PUSH) numsep = '>'; */
else numsep = ')';
snprintf(_prefixfmt,sizeof(_prefixfmt),"%%s%%%ud%c ",idxlen,numsep);
@@ -1351,6 +1389,25 @@ static sds cliFormatReplyTTY(redisReply *r, char *prefix) {
return out;
}
+/* Returns 1 if the reply is a pubsub pushed reply. */
+int isPubsubPush(redisReply *r) {
+ if (r == NULL ||
+ r->type != (config.current_resp3 ? REDIS_REPLY_PUSH : REDIS_REPLY_ARRAY) ||
+ r->elements < 3 ||
+ r->element[0]->type != REDIS_REPLY_STRING)
+ {
+ return 0;
+ }
+ char *str = r->element[0]->str;
+ size_t len = r->element[0]->len;
+ /* Check if it is [p|s][un]subscribe or [p|s]message, but even simpler, we
+ * just check that it ends with "message" or "subscribe". */
+ return ((len >= strlen("message") &&
+ !strcmp(str + len - strlen("message"), "message")) ||
+ (len >= strlen("subscribe") &&
+ !strcmp(str + len - strlen("subscribe"), "subscribe")));
+}
+
int isColorTerm(void) {
char *t = getenv("TERM");
return t != NULL && strstr(t,"xterm") != NULL;
@@ -1656,6 +1713,11 @@ static int cliReadReply(int output_raw_strings) {
sds out = NULL;
int output = 1;
+ if (config.last_reply) {
+ freeReplyObject(config.last_reply);
+ config.last_reply = NULL;
+ }
+
if (redisGetReply(context,&_reply) != REDIS_OK) {
if (config.blocking_state_aborted) {
config.blocking_state_aborted = 0;
@@ -1682,7 +1744,7 @@ static int cliReadReply(int output_raw_strings) {
return REDIS_ERR; /* avoid compiler warning */
}
- reply = (redisReply*)_reply;
+ config.last_reply = reply = (redisReply*)_reply;
config.last_cmd_type = reply->type;
@@ -1731,15 +1793,78 @@ static int cliReadReply(int output_raw_strings) {
fflush(stdout);
sdsfree(out);
}
- freeReplyObject(reply);
return REDIS_OK;
}
+/* Simultaneously wait for pubsub messages from redis and input on stdin. */
+static void cliWaitForMessagesOrStdin() {
+ int show_info = config.output != OUTPUT_RAW && (isatty(STDOUT_FILENO) ||
+ getenv("FAKETTY"));
+ int use_color = show_info && isColorTerm();
+ cliPressAnyKeyTTY();
+ while (config.pubsub_mode) {
+ /* First check if there are any buffered replies. */
+ redisReply *reply;
+ do {
+ if (redisGetReplyFromReader(context, (void **)&reply) != REDIS_OK) {
+ cliPrintContextError();
+ exit(1);
+ }
+ if (reply) {
+ sds out = cliFormatReply(reply, config.output, 0);
+ fwrite(out,sdslen(out),1,stdout);
+ fflush(stdout);
+ sdsfree(out);
+ }
+ } while(reply);
+
+ /* Wait for input, either on the Redis socket or on stdin. */
+ struct timeval tv;
+ fd_set readfds;
+ FD_ZERO(&readfds);
+ FD_SET(context->fd, &readfds);
+ FD_SET(STDIN_FILENO, &readfds);
+ tv.tv_sec = 5;
+ tv.tv_usec = 0;
+ if (show_info) {
+ if (use_color) printf("\033[1;90m"); /* Bold, bright color. */
+ printf("Reading messages... (press Ctrl-C to quit or any key to type command)\r");
+ if (use_color) printf("\033[0m"); /* Reset color. */
+ fflush(stdout);
+ }
+ select(context->fd + 1, &readfds, NULL, NULL, &tv);
+ if (show_info) {
+ printf("\033[K"); /* Erase current line */
+ fflush(stdout);
+ }
+ if (config.blocking_state_aborted) {
+ /* Ctrl-C pressed */
+ config.blocking_state_aborted = 0;
+ config.pubsub_mode = 0;
+ if (cliConnect(CC_FORCE) != REDIS_OK) {
+ cliPrintContextError();
+ exit(1);
+ }
+ break;
+ } else if (FD_ISSET(context->fd, &readfds)) {
+ /* Message from Redis */
+ if (cliReadReply(0) != REDIS_OK) {
+ cliPrintContextError();
+ exit(1);
+ }
+ fflush(stdout);
+ } else if (FD_ISSET(STDIN_FILENO, &readfds)) {
+ /* Any key pressed */
+ break;
+ }
+ }
+ cliRestoreTTY();
+}
+
static int cliSendCommand(int argc, char **argv, long repeat) {
char *command = argv[0];
size_t *argvlen;
int j, output_raw;
- int is_unsubscribe_command = 0; /* Is it an unsubscribe related command? */
if (context == NULL) return REDIS_ERR;
@@ -1775,12 +1900,12 @@ static int cliSendCommand(int argc, char **argv, long repeat) {
if (!strcasecmp(command,"shutdown")) config.shutdown = 1;
if (!strcasecmp(command,"monitor")) config.monitor_mode = 1;
- if (!strcasecmp(command,"subscribe") ||
- !strcasecmp(command,"psubscribe") ||
- !strcasecmp(command,"ssubscribe")) config.pubsub_mode = 1;
- if (!strcasecmp(command,"unsubscribe") ||
- !strcasecmp(command,"punsubscribe") ||
- !strcasecmp(command,"sunsubscribe")) is_unsubscribe_command = 1;
+ int is_subscribe = (!strcasecmp(command, "subscribe") ||
+ !strcasecmp(command, "psubscribe") ||
+ !strcasecmp(command, "ssubscribe"));
+ int is_unsubscribe = (!strcasecmp(command, "unsubscribe") ||
+ !strcasecmp(command, "punsubscribe") ||
+ !strcasecmp(command, "sunsubscribe"));
if (!strcasecmp(command,"sync") ||
!strcasecmp(command,"psync")) config.slave_mode = 1;
@@ -1812,21 +1937,6 @@ static int cliSendCommand(int argc, char **argv, long repeat) {
while(repeat < 0 || repeat-- > 0) {
redisAppendCommandArgv(context,argc,(const char**)argv,argvlen);
- if (is_unsubscribe_command) {
- /* In unsubscribe related commands, we need to read the specified
- * number of replies according to the number of parameters. */
- argc--; /* Skip the command */
- do {
- if (cliReadReply(output_raw) != REDIS_OK) {
- cliPrintContextError();
- exit(1);
- }
- fflush(stdout);
- } while(--argc);
- zfree(argvlen);
- continue;
- }
-
if (config.monitor_mode) {
do {
if (cliReadReply(output_raw) != REDIS_OK) {
@@ -1843,27 +1953,15 @@ static int cliSendCommand(int argc, char **argv, long repeat) {
return REDIS_OK;
}
- if (config.pubsub_mode) {
- if (config.output != OUTPUT_RAW)
- printf("Reading messages... (press Ctrl-C to quit)\n");
-
+ int num_expected_pubsub_push = 0;
+ if (is_subscribe || is_unsubscribe) {
+ /* When a push callback is set, redisGetReply (hiredis) loops until
+ * an in-band message is received, but these commands are confirmed
+ * using push replies only. There is one push reply per channel if
+ * channels are specified, otherwise at least one. */
+ num_expected_pubsub_push = argc > 1 ? argc - 1 : 1;
/* Unset our default PUSH handler so this works in RESP2/RESP3 */
redisSetPushCallback(context, NULL);
-
- while (config.pubsub_mode) {
- if (cliReadReply(output_raw) != REDIS_OK) {
- cliPrintContextError();
- exit(1);
- }
- fflush(stdout); /* Make it grep friendly */
- if (!config.pubsub_mode || config.last_cmd_type == REDIS_REPLY_ERROR) {
- if (config.push_output) {
- redisSetPushCallback(context, cliPushHandler);
- }
- config.pubsub_mode = 0;
- }
- }
- continue;
}
if (config.slave_mode) {
@@ -1874,10 +1972,35 @@ static int cliSendCommand(int argc, char **argv, long repeat) {
return REDIS_ERR; /* Error = slaveMode lost connection to master */
}
- if (cliReadReply(output_raw) != REDIS_OK) {
- zfree(argvlen);
- return REDIS_ERR;
- } else {
+ /* Read response, possibly skipping pubsub/push messages. */
+ while (1) {
+ if (cliReadReply(output_raw) != REDIS_OK) {
+ zfree(argvlen);
+ return REDIS_ERR;
+ }
+ fflush(stdout);
+ if (config.pubsub_mode || num_expected_pubsub_push > 0) {
+ if (isPubsubPush(config.last_reply)) {
+ if (num_expected_pubsub_push > 0 &&
+ !strcasecmp(config.last_reply->element[0]->str, command))
+ {
+ /* This pushed message confirms the
+ * [p|s][un]subscribe command. */
+ if (is_subscribe && !config.pubsub_mode) {
+ config.pubsub_mode = 1;
+ cliRefreshPrompt();
+ }
+ if (--num_expected_pubsub_push > 0) {
+ continue; /* We need more of these. */
+ }
+ } else {
+ continue; /* Skip this pubsub message. */
+ }
+ } else if (config.last_reply->type == REDIS_REPLY_PUSH) {
+ continue; /* Skip other push message. */
+ }
+ }
+
/* Store database number when SELECT was successfully executed. */
if (!strcasecmp(command,"select") && argc == 2 &&
config.last_cmd_type != REDIS_REPLY_ERROR)
@@ -1911,9 +2034,25 @@ static int cliSendCommand(int argc, char **argv, long repeat) {
config.in_multi = 0;
config.dbnum = 0;
config.conn_info.input_dbnum = 0;
- config.resp3 = 0;
+ config.current_resp3 = 0;
+ if (config.pubsub_mode && config.push_output) {
+ redisSetPushCallback(context, cliPushHandler);
+ }
+ config.pubsub_mode = 0;
cliRefreshPrompt();
+ } else if (!strcasecmp(command,"hello")) {
+ if (config.last_cmd_type == REDIS_REPLY_MAP) {
+ config.current_resp3 = 1;
+ } else if (config.last_cmd_type == REDIS_REPLY_ARRAY) {
+ config.current_resp3 = 0;
+ }
+ } else if ((is_subscribe || is_unsubscribe) && !config.pubsub_mode) {
+ /* We didn't enter pubsub mode. Restore push callback. */
+ if (config.push_output)
+ redisSetPushCallback(context, cliPushHandler);
}
+
+ break;
}
if (config.cluster_reissue_command){
/* If we need to reissue the command, break to prevent a
@@ -2664,8 +2803,17 @@ static void repl(void) {
}
cliRefreshPrompt();
- while((line = linenoise(context ? config.prompt : "not connected> ")) != NULL) {
- if (line[0] != '\0') {
+ while(1) {
+ line = linenoise(context ? config.prompt : "not connected> ");
+ if (line == NULL) {
+ /* ^C, ^D or similar. */
+ if (config.pubsub_mode) {
+ config.pubsub_mode = 0;
+ if (cliConnect(CC_FORCE) == REDIS_OK)
+ continue;
+ }
+ break;
+ } else if (line[0] != '\0') {
long repeat = 1;
int skipargs = 0;
char *endptr = NULL;
@@ -2759,6 +2907,11 @@ static void repl(void) {
/* Free the argument vector */
sdsfreesplitres(argv,argc);
}
+
+ if (config.pubsub_mode) {
+ cliWaitForMessagesOrStdin();
+ }
+
/* linenoise() returns malloc-ed lines like readline() */
linenoiseFree(line);
}
@@ -2799,6 +2952,13 @@ static int noninteractive(int argc, char **argv) {
retval = issueCommand(argc, sds_args);
sdsfreesplitres(sds_args, argc);
+ while (config.pubsub_mode) {
+ if (cliReadReply(0) != REDIS_OK) {
+ cliPrintContextError();
+ exit(1);
+ }
+ fflush(stdout);
+ }
return retval == REDIS_OK ? 0 : 1;
}
@@ -9011,6 +9171,7 @@ int main(int argc, char **argv) {
config.eval_ldb_sync = 0;
config.enable_ldb_on_eval = 0;
config.last_cmd_type = -1;
+ config.last_reply = NULL;
config.verbose = 0;
config.set_errcode = 0;
config.no_auth_warning = 0;