summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--deps/linenoise/linenoise.c9
-rw-r--r--src/redis-cli.c263
-rw-r--r--tests/integration/redis-cli.tcl82
3 files changed, 298 insertions, 56 deletions
diff --git a/deps/linenoise/linenoise.c b/deps/linenoise/linenoise.c
index 1b01622c5..dd86abe86 100644
--- a/deps/linenoise/linenoise.c
+++ b/deps/linenoise/linenoise.c
@@ -163,6 +163,7 @@ enum KEY_ACTION{
CTRL_F = 6, /* Ctrl-f */
CTRL_H = 8, /* Ctrl-h */
TAB = 9, /* Tab */
+ NL = 10, /* Enter typed before raw mode was enabled */
CTRL_K = 11, /* Ctrl+k */
CTRL_L = 12, /* Ctrl+l */
ENTER = 13, /* Enter */
@@ -256,8 +257,8 @@ static int enableRawMode(int fd) {
* We want read to return every single byte, without timeout. */
raw.c_cc[VMIN] = 1; raw.c_cc[VTIME] = 0; /* 1 byte, no timer */
- /* put terminal in raw mode after flushing */
- if (tcsetattr(fd,TCSAFLUSH,&raw) < 0) goto fatal;
+ /* put terminal in raw mode */
+ if (tcsetattr(fd,TCSANOW,&raw) < 0) goto fatal;
rawmode = 1;
return 0;
@@ -268,7 +269,7 @@ fatal:
static void disableRawMode(int fd) {
/* Don't even check the return value as it's too late. */
- if (rawmode && tcsetattr(fd,TCSAFLUSH,&orig_termios) != -1)
+ if (rawmode && tcsetattr(fd,TCSANOW,&orig_termios) != -1)
rawmode = 0;
}
@@ -840,6 +841,8 @@ static int linenoiseEdit(int stdin_fd, int stdout_fd, char *buf, size_t buflen,
}
switch(c) {
+ case NL: /* enter, typed before raw mode was enabled */
+ break;
case ENTER: /* enter */
history_len--;
free(history[history_len]);
diff --git a/src/redis-cli.c b/src/redis-cli.c
index 964ae59ef..d8e6b966a 100644
--- a/src/redis-cli.c
+++ b/src/redis-cli.c
@@ -45,6 +45,7 @@
#include <fcntl.h>
#include <limits.h>
#include <math.h>
+#include <termios.h>
#include <hiredis.h>
#ifdef USE_OPENSSL
@@ -172,6 +173,9 @@ int spectrum_palette_mono[] = {0,233,234,235,237,239,241,243,245,247,249,251,253
int *spectrum_palette;
int spectrum_palette_size;
+static int orig_termios_saved = 0;
+static struct termios orig_termios; /* To restore terminal at exit.*/
+
/* Dict Helpers */
static uint64_t dictSdsHash(const void *key);
static int dictSdsKeyCompare(dict *d, const void *key1,
@@ -267,12 +271,14 @@ static struct config {
int eval_ldb_end; /* Lua debugging session ended. */
int enable_ldb_on_eval; /* Handle manual SCRIPT DEBUG + EVAL commands. */
int last_cmd_type;
+ redisReply *last_reply;
int verbose;
int set_errcode;
clusterManagerCommand cluster_manager_command;
int no_auth_warning;
- int resp2;
+ int resp2; /* value of 1: specified explicitly with option -2 */
int resp3; /* value of 1: specified explicitly, value of 2: implicit like --json option */
+ int current_resp3; /* 1 if we have RESP3 right now in the current connection. */
int in_multi;
int pre_multi_dbnum;
} config;
@@ -335,6 +341,9 @@ static void cliRefreshPrompt(void) {
if (config.in_multi)
prompt = sdscatlen(prompt,"(TX)",4);
+ if (config.pubsub_mode)
+ prompt = sdscatfmt(prompt,"(subscribed mode)");
+
/* Copy the prompt in the static buffer. */
prompt = sdscatlen(prompt,"> ",2);
snprintf(config.prompt,sizeof(config.prompt),"%s",prompt);
@@ -1017,6 +1026,29 @@ static void freeHintsCallback(void *ptr) {
}
/*------------------------------------------------------------------------------
+ * TTY manipulation
+ *--------------------------------------------------------------------------- */
+
+/* Restore terminal if we've changed it. */
+void cliRestoreTTY(void) {
+ if (orig_termios_saved)
+ tcsetattr(STDIN_FILENO, TCSANOW, &orig_termios);
+}
+
+/* Put the terminal in "press any key" mode */
+static void cliPressAnyKeyTTY(void) {
+ if (!isatty(STDIN_FILENO)) return;
+ if (!orig_termios_saved) {
+ if (tcgetattr(STDIN_FILENO, &orig_termios) == -1) return;
+ atexit(cliRestoreTTY);
+ orig_termios_saved = 1;
+ }
+ struct termios mode = orig_termios;
+ mode.c_lflag &= ~(ECHO | ICANON); /* echoing off, canonical off */
+ tcsetattr(STDIN_FILENO, TCSANOW, &mode);
+}
+
+/*------------------------------------------------------------------------------
* Networking / parsing
*--------------------------------------------------------------------------- */
@@ -1088,6 +1120,7 @@ static int cliSwitchProto(void) {
}
}
freeReplyObject(reply);
+ config.current_resp3 = 1;
return result;
}
@@ -1147,6 +1180,9 @@ static int cliConnect(int flags) {
* errors. */
anetKeepAlive(NULL, context->fd, REDIS_CLI_KEEPALIVE_INTERVAL);
+ /* State of the current connection. */
+ config.current_resp3 = 0;
+
/* Do AUTH, select the right DB, switch to RESP3 if needed. */
if (cliAuth(context, config.conn_info.user, config.conn_info.auth) != REDIS_OK)
return REDIS_ERR;
@@ -1309,6 +1345,8 @@ static sds cliFormatReplyTTY(redisReply *r, char *prefix) {
char numsep;
if (r->type == REDIS_REPLY_SET) numsep = '~';
else if (r->type == REDIS_REPLY_MAP) numsep = '#';
+ /* TODO: this would be a breaking change for scripts, do that in a major version. */
+ /* else if (r->type == REDIS_REPLY_PUSH) numsep = '>'; */
else numsep = ')';
snprintf(_prefixfmt,sizeof(_prefixfmt),"%%s%%%ud%c ",idxlen,numsep);
@@ -1351,6 +1389,25 @@ static sds cliFormatReplyTTY(redisReply *r, char *prefix) {
return out;
}
+/* Returns 1 if the reply is a pubsub pushed reply. */
+int isPubsubPush(redisReply *r) {
+ if (r == NULL ||
+ r->type != (config.current_resp3 ? REDIS_REPLY_PUSH : REDIS_REPLY_ARRAY) ||
+ r->elements < 3 ||
+ r->element[0]->type != REDIS_REPLY_STRING)
+ {
+ return 0;
+ }
+ char *str = r->element[0]->str;
+ size_t len = r->element[0]->len;
+ /* Check if it is [p|s][un]subscribe or [p|s]message, but even simpler, we
+ * just check that it ends with "message" or "subscribe". */
+ return ((len >= strlen("message") &&
+ !strcmp(str + len - strlen("message"), "message")) ||
+ (len >= strlen("subscribe") &&
+ !strcmp(str + len - strlen("subscribe"), "subscribe")));
+}
+
int isColorTerm(void) {
char *t = getenv("TERM");
return t != NULL && strstr(t,"xterm") != NULL;
@@ -1656,6 +1713,11 @@ static int cliReadReply(int output_raw_strings) {
sds out = NULL;
int output = 1;
+ if (config.last_reply) {
+ freeReplyObject(config.last_reply);
+ config.last_reply = NULL;
+ }
+
if (redisGetReply(context,&_reply) != REDIS_OK) {
if (config.blocking_state_aborted) {
config.blocking_state_aborted = 0;
@@ -1682,7 +1744,7 @@ static int cliReadReply(int output_raw_strings) {
return REDIS_ERR; /* avoid compiler warning */
}
- reply = (redisReply*)_reply;
+ config.last_reply = reply = (redisReply*)_reply;
config.last_cmd_type = reply->type;
@@ -1731,15 +1793,78 @@ static int cliReadReply(int output_raw_strings) {
fflush(stdout);
sdsfree(out);
}
- freeReplyObject(reply);
return REDIS_OK;
}
+/* Simultaneously wait for pubsub messages from redis and input on stdin. */
+static void cliWaitForMessagesOrStdin() {
+ int show_info = config.output != OUTPUT_RAW && (isatty(STDOUT_FILENO) ||
+ getenv("FAKETTY"));
+ int use_color = show_info && isColorTerm();
+ cliPressAnyKeyTTY();
+ while (config.pubsub_mode) {
+ /* First check if there are any buffered replies. */
+ redisReply *reply;
+ do {
+ if (redisGetReplyFromReader(context, (void **)&reply) != REDIS_OK) {
+ cliPrintContextError();
+ exit(1);
+ }
+ if (reply) {
+ sds out = cliFormatReply(reply, config.output, 0);
+ fwrite(out,sdslen(out),1,stdout);
+ fflush(stdout);
+ sdsfree(out);
+ }
+ } while(reply);
+
+ /* Wait for input, either on the Redis socket or on stdin. */
+ struct timeval tv;
+ fd_set readfds;
+ FD_ZERO(&readfds);
+ FD_SET(context->fd, &readfds);
+ FD_SET(STDIN_FILENO, &readfds);
+ tv.tv_sec = 5;
+ tv.tv_usec = 0;
+ if (show_info) {
+ if (use_color) printf("\033[1;90m"); /* Bold, bright color. */
+ printf("Reading messages... (press Ctrl-C to quit or any key to type command)\r");
+ if (use_color) printf("\033[0m"); /* Reset color. */
+ fflush(stdout);
+ }
+ select(context->fd + 1, &readfds, NULL, NULL, &tv);
+ if (show_info) {
+ printf("\033[K"); /* Erase current line */
+ fflush(stdout);
+ }
+ if (config.blocking_state_aborted) {
+ /* Ctrl-C pressed */
+ config.blocking_state_aborted = 0;
+ config.pubsub_mode = 0;
+ if (cliConnect(CC_FORCE) != REDIS_OK) {
+ cliPrintContextError();
+ exit(1);
+ }
+ break;
+ } else if (FD_ISSET(context->fd, &readfds)) {
+ /* Message from Redis */
+ if (cliReadReply(0) != REDIS_OK) {
+ cliPrintContextError();
+ exit(1);
+ }
+ fflush(stdout);
+ } else if (FD_ISSET(STDIN_FILENO, &readfds)) {
+ /* Any key pressed */
+ break;
+ }
+ }
+ cliRestoreTTY();
+}
+
static int cliSendCommand(int argc, char **argv, long repeat) {
char *command = argv[0];
size_t *argvlen;
int j, output_raw;
- int is_unsubscribe_command = 0; /* Is it an unsubscribe related command? */
if (context == NULL) return REDIS_ERR;
@@ -1775,12 +1900,12 @@ static int cliSendCommand(int argc, char **argv, long repeat) {
if (!strcasecmp(command,"shutdown")) config.shutdown = 1;
if (!strcasecmp(command,"monitor")) config.monitor_mode = 1;
- if (!strcasecmp(command,"subscribe") ||
- !strcasecmp(command,"psubscribe") ||
- !strcasecmp(command,"ssubscribe")) config.pubsub_mode = 1;
- if (!strcasecmp(command,"unsubscribe") ||
- !strcasecmp(command,"punsubscribe") ||
- !strcasecmp(command,"sunsubscribe")) is_unsubscribe_command = 1;
+ int is_subscribe = (!strcasecmp(command, "subscribe") ||
+ !strcasecmp(command, "psubscribe") ||
+ !strcasecmp(command, "ssubscribe"));
+ int is_unsubscribe = (!strcasecmp(command, "unsubscribe") ||
+ !strcasecmp(command, "punsubscribe") ||
+ !strcasecmp(command, "sunsubscribe"));
if (!strcasecmp(command,"sync") ||
!strcasecmp(command,"psync")) config.slave_mode = 1;
@@ -1812,21 +1937,6 @@ static int cliSendCommand(int argc, char **argv, long repeat) {
while(repeat < 0 || repeat-- > 0) {
redisAppendCommandArgv(context,argc,(const char**)argv,argvlen);
- if (is_unsubscribe_command) {
- /* In unsubscribe related commands, we need to read the specified
- * number of replies according to the number of parameters. */
- argc--; /* Skip the command */
- do {
- if (cliReadReply(output_raw) != REDIS_OK) {
- cliPrintContextError();
- exit(1);
- }
- fflush(stdout);
- } while(--argc);
- zfree(argvlen);
- continue;
- }
-
if (config.monitor_mode) {
do {
if (cliReadReply(output_raw) != REDIS_OK) {
@@ -1843,27 +1953,15 @@ static int cliSendCommand(int argc, char **argv, long repeat) {
return REDIS_OK;
}
- if (config.pubsub_mode) {
- if (config.output != OUTPUT_RAW)
- printf("Reading messages... (press Ctrl-C to quit)\n");
-
+ int num_expected_pubsub_push = 0;
+ if (is_subscribe || is_unsubscribe) {
+ /* When a push callback is set, redisGetReply (hiredis) loops until
+ * an in-band message is received, but these commands are confirmed
+ * using push replies only. There is one push reply per channel if
+ * channels are specified, otherwise at least one. */
+ num_expected_pubsub_push = argc > 1 ? argc - 1 : 1;
/* Unset our default PUSH handler so this works in RESP2/RESP3 */
redisSetPushCallback(context, NULL);
-
- while (config.pubsub_mode) {
- if (cliReadReply(output_raw) != REDIS_OK) {
- cliPrintContextError();
- exit(1);
- }
- fflush(stdout); /* Make it grep friendly */
- if (!config.pubsub_mode || config.last_cmd_type == REDIS_REPLY_ERROR) {
- if (config.push_output) {
- redisSetPushCallback(context, cliPushHandler);
- }
- config.pubsub_mode = 0;
- }
- }
- continue;
}
if (config.slave_mode) {
@@ -1874,10 +1972,35 @@ static int cliSendCommand(int argc, char **argv, long repeat) {
return REDIS_ERR; /* Error = slaveMode lost connection to master */
}
- if (cliReadReply(output_raw) != REDIS_OK) {
- zfree(argvlen);
- return REDIS_ERR;
- } else {
+ /* Read response, possibly skipping pubsub/push messages. */
+ while (1) {
+ if (cliReadReply(output_raw) != REDIS_OK) {
+ zfree(argvlen);
+ return REDIS_ERR;
+ }
+ fflush(stdout);
+ if (config.pubsub_mode || num_expected_pubsub_push > 0) {
+ if (isPubsubPush(config.last_reply)) {
+ if (num_expected_pubsub_push > 0 &&
+ !strcasecmp(config.last_reply->element[0]->str, command))
+ {
+ /* This pushed message confirms the
+ * [p|s][un]subscribe command. */
+ if (is_subscribe && !config.pubsub_mode) {
+ config.pubsub_mode = 1;
+ cliRefreshPrompt();
+ }
+ if (--num_expected_pubsub_push > 0) {
+ continue; /* We need more of these. */
+ }
+ } else {
+ continue; /* Skip this pubsub message. */
+ }
+ } else if (config.last_reply->type == REDIS_REPLY_PUSH) {
+ continue; /* Skip other push message. */
+ }
+ }
+
/* Store database number when SELECT was successfully executed. */
if (!strcasecmp(command,"select") && argc == 2 &&
config.last_cmd_type != REDIS_REPLY_ERROR)
@@ -1911,9 +2034,25 @@ static int cliSendCommand(int argc, char **argv, long repeat) {
config.in_multi = 0;
config.dbnum = 0;
config.conn_info.input_dbnum = 0;
- config.resp3 = 0;
+ config.current_resp3 = 0;
+ if (config.pubsub_mode && config.push_output) {
+ redisSetPushCallback(context, cliPushHandler);
+ }
+ config.pubsub_mode = 0;
cliRefreshPrompt();
+ } else if (!strcasecmp(command,"hello")) {
+ if (config.last_cmd_type == REDIS_REPLY_MAP) {
+ config.current_resp3 = 1;
+ } else if (config.last_cmd_type == REDIS_REPLY_ARRAY) {
+ config.current_resp3 = 0;
+ }
+ } else if ((is_subscribe || is_unsubscribe) && !config.pubsub_mode) {
+ /* We didn't enter pubsub mode. Restore push callback. */
+ if (config.push_output)
+ redisSetPushCallback(context, cliPushHandler);
}
+
+ break;
}
if (config.cluster_reissue_command){
/* If we need to reissue the command, break to prevent a
@@ -2664,8 +2803,17 @@ static void repl(void) {
}
cliRefreshPrompt();
- while((line = linenoise(context ? config.prompt : "not connected> ")) != NULL) {
- if (line[0] != '\0') {
+ while(1) {
+ line = linenoise(context ? config.prompt : "not connected> ");
+ if (line == NULL) {
+ /* ^C, ^D or similar. */
+ if (config.pubsub_mode) {
+ config.pubsub_mode = 0;
+ if (cliConnect(CC_FORCE) == REDIS_OK)
+ continue;
+ }
+ break;
+ } else if (line[0] != '\0') {
long repeat = 1;
int skipargs = 0;
char *endptr = NULL;
@@ -2759,6 +2907,11 @@ static void repl(void) {
/* Free the argument vector */
sdsfreesplitres(argv,argc);
}
+
+ if (config.pubsub_mode) {
+ cliWaitForMessagesOrStdin();
+ }
+
/* linenoise() returns malloc-ed lines like readline() */
linenoiseFree(line);
}
@@ -2799,6 +2952,13 @@ static int noninteractive(int argc, char **argv) {
retval = issueCommand(argc, sds_args);
sdsfreesplitres(sds_args, argc);
+ while (config.pubsub_mode) {
+ if (cliReadReply(0) != REDIS_OK) {
+ cliPrintContextError();
+ exit(1);
+ }
+ fflush(stdout);
+ }
return retval == REDIS_OK ? 0 : 1;
}
@@ -9011,6 +9171,7 @@ int main(int argc, char **argv) {
config.eval_ldb_sync = 0;
config.enable_ldb_on_eval = 0;
config.last_cmd_type = -1;
+ config.last_reply = NULL;
config.verbose = 0;
config.set_errcode = 0;
config.no_auth_warning = 0;
diff --git a/tests/integration/redis-cli.tcl b/tests/integration/redis-cli.tcl
index e159fb17d..50f4a2e83 100644
--- a/tests/integration/redis-cli.tcl
+++ b/tests/integration/redis-cli.tcl
@@ -60,7 +60,7 @@ start_server {tags {"cli"}} {
# Helpers to run tests in interactive mode
proc format_output {output} {
- set _ [string trimright [regsub -all "\r" $output ""] "\n"]
+ set _ [string trimright $output "\n"]
}
proc run_command {fd cmd} {
@@ -76,6 +76,12 @@ start_server {tags {"cli"}} {
unset ::env(FAKETTY)
}
+ proc test_interactive_nontty_cli {name code} {
+ set fd [open_cli]
+ test "Interactive non-TTY CLI: $name" $code
+ close_cli $fd
+ }
+
# Helpers to run tests where stdout is not a tty
proc write_tmpfile {contents} {
set tmp [tmpfile "cli"]
@@ -142,7 +148,8 @@ start_server {tags {"cli"}} {
test_interactive_cli "INFO response should be printed raw" {
set lines [split [run_command $fd info] "\n"]
foreach line $lines {
- if {![regexp {^$|^#|^[^#:]+:} $line]} {
+ # Info lines end in \r\n, so they now end in \r.
+ if {![regexp {^\r$|^#|^[^#:]+:} $line]} {
fail "Malformed info line: $line"
}
}
@@ -186,6 +193,77 @@ start_server {tags {"cli"}} {
assert_equal "bar" [r get key]
}
+ test_interactive_cli "Subscribed mode" {
+ set reading "Reading messages... (press Ctrl-C to quit or any key to type command)\r"
+ set erase "\033\[K"; # Erases the "Reading messages..." line.
+
+ # Subscribe to some channels.
+ set sub1 "1) \"subscribe\"\n2) \"ch1\"\n3) (integer) 1\n"
+ set sub2 "1) \"subscribe\"\n2) \"ch2\"\n3) (integer) 2\n"
+ set sub3 "1) \"subscribe\"\n2) \"ch3\"\n3) (integer) 3\n"
+ assert_equal $sub1$sub2$sub3$reading \
+ [run_command $fd "subscribe ch1 ch2 ch3"]
+
+ # Receive pubsub message.
+ r publish ch2 hello
+ set message "1) \"message\"\n2) \"ch2\"\n3) \"hello\"\n"
+ assert_equal $erase$message$reading [read_cli $fd]
+
+ # Unsubscribe some.
+ set unsub1 "1) \"unsubscribe\"\n2) \"ch1\"\n3) (integer) 2\n"
+ set unsub2 "1) \"unsubscribe\"\n2) \"ch2\"\n3) (integer) 1\n"
+ assert_equal $erase$unsub1$unsub2$reading \
+ [run_command $fd "unsubscribe ch1 ch2"]
+
+ # Command forbidden in subscribed mode (RESP2).
+ set err "(error) ERR Can't execute 'get': only (P|S)SUBSCRIBE / (P|S)UNSUBSCRIBE / PING / QUIT / RESET are allowed in this context\n"
+ assert_equal $erase$err$reading [run_command $fd "get k"]
+
+ # Command allowed in subscribed mode.
+ set pong "1) \"pong\"\n2) \"\"\n"
+ assert_equal $erase$pong$reading [run_command $fd "ping"]
+
+ # Reset exits subscribed mode.
+ assert_equal ${erase}RESET [run_command $fd "reset"]
+ assert_equal PONG [run_command $fd "ping"]
+
+ # Check TTY output of push messages in RESP3 has ")" prefix (to be changed to ">" in the future).
+ assert_match "1#*" [run_command $fd "hello 3"]
+ set sub1 "1) \"subscribe\"\n2) \"ch1\"\n3) (integer) 1\n"
+ assert_equal $sub1$reading \
+ [run_command $fd "subscribe ch1"]
+ }
+
+ test_interactive_nontty_cli "Subscribed mode" {
+ # Raw output and no "Reading messages..." info message.
+ # Use RESP3 in this test case.
+ assert_match {*proto 3*} [run_command $fd "hello 3"]
+
+ # Subscribe to some channels.
+ set sub1 "subscribe\nch1\n1"
+ set sub2 "subscribe\nch2\n2"
+ assert_equal $sub1\n$sub2 \
+ [run_command $fd "subscribe ch1 ch2"]
+
+ assert_equal OK [run_command $fd "client tracking on"]
+ assert_equal OK [run_command $fd "set k 42"]
+ assert_equal 42 [run_command $fd "get k"]
+
+ # Interleaving invalidate and pubsub messages.
+ r publish ch1 hello
+ r del k
+ r publish ch2 world
+ set message1 "message\nch1\nhello"
+ set invalidate "invalidate\nk"
+ set message2 "message\nch2\nworld"
+ assert_equal $message1\n$invalidate\n$message2\n [read_cli $fd]
+
+ # Unsubscribe all.
+ set unsub1 "unsubscribe\nch1\n1"
+ set unsub2 "unsubscribe\nch2\n0"
+ assert_equal $unsub1\n$unsub2 [run_command $fd "unsubscribe ch1 ch2"]
+ }
+
test_tty_cli "Status reply" {
assert_equal "OK" [run_cli set key bar]
assert_equal "bar" [r get key]