summaryrefslogtreecommitdiff
path: root/src/redis-cli.c
diff options
context:
space:
mode:
authorViktor Söderqvist <viktor.soderqvist@est.tech>2023-03-19 11:56:54 +0100
committerGitHub <noreply@github.com>2023-03-19 12:56:54 +0200
commitbbf364a442b463f3cf0d310ad052727de68a493a (patch)
tree5adacdc523aa860019de0bb35bdd99d144eb932b /src/redis-cli.c
parentc9466b24a60df4254dd2ab4ba71048c30b49076f (diff)
downloadredis-bbf364a442b463f3cf0d310ad052727de68a493a.tar.gz
redis-cli: Accept commands in subscribed mode (#11873)
The message "Reading messages... (press Ctrl-C to quit)" is replaced by "Reading messages... (press Ctrl-C to quit or any key to type command)". This allows users to subscribe to more channels, to try out UNSUBSCRIBE and to combine pubsub with other features such as push messages from client tracking. The "Reading messages" info message is displayed in the bottom of the output in a distinct style and moves downward as more messages appear. When any key is pressed, the info message is replaced by the prompt with for entering commands. After entering a command and the reply is displayed, the "Reading messages" info messages appears again. This is added to the repl loop in redis-cli and in the corresponding place for non-interactive mode. An indication "(subscribed mode)" is included in the prompt when entering commands in subscribed mode. Also: * Fixes a problem that UNSUBSCRIBE hanged when used with RESP3 and push callback, without first entering subscribe mode. It hanged because UNSUBSCRIBE gets one or more push replies but no in-band reply. * Exit subscribed mode after RESET.
Diffstat (limited to 'src/redis-cli.c')
-rw-r--r--src/redis-cli.c263
1 files changed, 212 insertions, 51 deletions
diff --git a/src/redis-cli.c b/src/redis-cli.c
index 964ae59ef..d8e6b966a 100644
--- a/src/redis-cli.c
+++ b/src/redis-cli.c
@@ -45,6 +45,7 @@
#include <fcntl.h>
#include <limits.h>
#include <math.h>
+#include <termios.h>
#include <hiredis.h>
#ifdef USE_OPENSSL
@@ -172,6 +173,9 @@ int spectrum_palette_mono[] = {0,233,234,235,237,239,241,243,245,247,249,251,253
int *spectrum_palette;
int spectrum_palette_size;
+static int orig_termios_saved = 0;
+static struct termios orig_termios; /* To restore terminal at exit.*/
+
/* Dict Helpers */
static uint64_t dictSdsHash(const void *key);
static int dictSdsKeyCompare(dict *d, const void *key1,
@@ -267,12 +271,14 @@ static struct config {
int eval_ldb_end; /* Lua debugging session ended. */
int enable_ldb_on_eval; /* Handle manual SCRIPT DEBUG + EVAL commands. */
int last_cmd_type;
+ redisReply *last_reply;
int verbose;
int set_errcode;
clusterManagerCommand cluster_manager_command;
int no_auth_warning;
- int resp2;
+ int resp2; /* value of 1: specified explicitly with option -2 */
int resp3; /* value of 1: specified explicitly, value of 2: implicit like --json option */
+ int current_resp3; /* 1 if we have RESP3 right now in the current connection. */
int in_multi;
int pre_multi_dbnum;
} config;
@@ -335,6 +341,9 @@ static void cliRefreshPrompt(void) {
if (config.in_multi)
prompt = sdscatlen(prompt,"(TX)",4);
+ if (config.pubsub_mode)
+ prompt = sdscatfmt(prompt,"(subscribed mode)");
+
/* Copy the prompt in the static buffer. */
prompt = sdscatlen(prompt,"> ",2);
snprintf(config.prompt,sizeof(config.prompt),"%s",prompt);
@@ -1017,6 +1026,29 @@ static void freeHintsCallback(void *ptr) {
}
/*------------------------------------------------------------------------------
+ * TTY manipulation
+ *--------------------------------------------------------------------------- */
+
+/* Restore terminal if we've changed it. */
+void cliRestoreTTY(void) {
+ if (orig_termios_saved)
+ tcsetattr(STDIN_FILENO, TCSANOW, &orig_termios);
+}
+
+/* Put the terminal in "press any key" mode */
+static void cliPressAnyKeyTTY(void) {
+ if (!isatty(STDIN_FILENO)) return;
+ if (!orig_termios_saved) {
+ if (tcgetattr(STDIN_FILENO, &orig_termios) == -1) return;
+ atexit(cliRestoreTTY);
+ orig_termios_saved = 1;
+ }
+ struct termios mode = orig_termios;
+ mode.c_lflag &= ~(ECHO | ICANON); /* echoing off, canonical off */
+ tcsetattr(STDIN_FILENO, TCSANOW, &mode);
+}
+
+/*------------------------------------------------------------------------------
* Networking / parsing
*--------------------------------------------------------------------------- */
@@ -1088,6 +1120,7 @@ static int cliSwitchProto(void) {
}
}
freeReplyObject(reply);
+ config.current_resp3 = 1;
return result;
}
@@ -1147,6 +1180,9 @@ static int cliConnect(int flags) {
* errors. */
anetKeepAlive(NULL, context->fd, REDIS_CLI_KEEPALIVE_INTERVAL);
+ /* State of the current connection. */
+ config.current_resp3 = 0;
+
/* Do AUTH, select the right DB, switch to RESP3 if needed. */
if (cliAuth(context, config.conn_info.user, config.conn_info.auth) != REDIS_OK)
return REDIS_ERR;
@@ -1309,6 +1345,8 @@ static sds cliFormatReplyTTY(redisReply *r, char *prefix) {
char numsep;
if (r->type == REDIS_REPLY_SET) numsep = '~';
else if (r->type == REDIS_REPLY_MAP) numsep = '#';
+ /* TODO: this would be a breaking change for scripts, do that in a major version. */
+ /* else if (r->type == REDIS_REPLY_PUSH) numsep = '>'; */
else numsep = ')';
snprintf(_prefixfmt,sizeof(_prefixfmt),"%%s%%%ud%c ",idxlen,numsep);
@@ -1351,6 +1389,25 @@ static sds cliFormatReplyTTY(redisReply *r, char *prefix) {
return out;
}
+/* Returns 1 if the reply is a pubsub pushed reply. */
+int isPubsubPush(redisReply *r) {
+ if (r == NULL ||
+ r->type != (config.current_resp3 ? REDIS_REPLY_PUSH : REDIS_REPLY_ARRAY) ||
+ r->elements < 3 ||
+ r->element[0]->type != REDIS_REPLY_STRING)
+ {
+ return 0;
+ }
+ char *str = r->element[0]->str;
+ size_t len = r->element[0]->len;
+ /* Check if it is [p|s][un]subscribe or [p|s]message, but even simpler, we
+ * just check that it ends with "message" or "subscribe". */
+ return ((len >= strlen("message") &&
+ !strcmp(str + len - strlen("message"), "message")) ||
+ (len >= strlen("subscribe") &&
+ !strcmp(str + len - strlen("subscribe"), "subscribe")));
+}
+
int isColorTerm(void) {
char *t = getenv("TERM");
return t != NULL && strstr(t,"xterm") != NULL;
@@ -1656,6 +1713,11 @@ static int cliReadReply(int output_raw_strings) {
sds out = NULL;
int output = 1;
+ if (config.last_reply) {
+ freeReplyObject(config.last_reply);
+ config.last_reply = NULL;
+ }
+
if (redisGetReply(context,&_reply) != REDIS_OK) {
if (config.blocking_state_aborted) {
config.blocking_state_aborted = 0;
@@ -1682,7 +1744,7 @@ static int cliReadReply(int output_raw_strings) {
return REDIS_ERR; /* avoid compiler warning */
}
- reply = (redisReply*)_reply;
+ config.last_reply = reply = (redisReply*)_reply;
config.last_cmd_type = reply->type;
@@ -1731,15 +1793,78 @@ static int cliReadReply(int output_raw_strings) {
fflush(stdout);
sdsfree(out);
}
- freeReplyObject(reply);
return REDIS_OK;
}
+/* Simultaneously wait for pubsub messages from redis and input on stdin. */
+static void cliWaitForMessagesOrStdin() {
+ int show_info = config.output != OUTPUT_RAW && (isatty(STDOUT_FILENO) ||
+ getenv("FAKETTY"));
+ int use_color = show_info && isColorTerm();
+ cliPressAnyKeyTTY();
+ while (config.pubsub_mode) {
+ /* First check if there are any buffered replies. */
+ redisReply *reply;
+ do {
+ if (redisGetReplyFromReader(context, (void **)&reply) != REDIS_OK) {
+ cliPrintContextError();
+ exit(1);
+ }
+ if (reply) {
+ sds out = cliFormatReply(reply, config.output, 0);
+ fwrite(out,sdslen(out),1,stdout);
+ fflush(stdout);
+ sdsfree(out);
+ }
+ } while(reply);
+
+ /* Wait for input, either on the Redis socket or on stdin. */
+ struct timeval tv;
+ fd_set readfds;
+ FD_ZERO(&readfds);
+ FD_SET(context->fd, &readfds);
+ FD_SET(STDIN_FILENO, &readfds);
+ tv.tv_sec = 5;
+ tv.tv_usec = 0;
+ if (show_info) {
+ if (use_color) printf("\033[1;90m"); /* Bold, bright color. */
+ printf("Reading messages... (press Ctrl-C to quit or any key to type command)\r");
+ if (use_color) printf("\033[0m"); /* Reset color. */
+ fflush(stdout);
+ }
+ select(context->fd + 1, &readfds, NULL, NULL, &tv);
+ if (show_info) {
+ printf("\033[K"); /* Erase current line */
+ fflush(stdout);
+ }
+ if (config.blocking_state_aborted) {
+ /* Ctrl-C pressed */
+ config.blocking_state_aborted = 0;
+ config.pubsub_mode = 0;
+ if (cliConnect(CC_FORCE) != REDIS_OK) {
+ cliPrintContextError();
+ exit(1);
+ }
+ break;
+ } else if (FD_ISSET(context->fd, &readfds)) {
+ /* Message from Redis */
+ if (cliReadReply(0) != REDIS_OK) {
+ cliPrintContextError();
+ exit(1);
+ }
+ fflush(stdout);
+ } else if (FD_ISSET(STDIN_FILENO, &readfds)) {
+ /* Any key pressed */
+ break;
+ }
+ }
+ cliRestoreTTY();
+}
+
static int cliSendCommand(int argc, char **argv, long repeat) {
char *command = argv[0];
size_t *argvlen;
int j, output_raw;
- int is_unsubscribe_command = 0; /* Is it an unsubscribe related command? */
if (context == NULL) return REDIS_ERR;
@@ -1775,12 +1900,12 @@ static int cliSendCommand(int argc, char **argv, long repeat) {
if (!strcasecmp(command,"shutdown")) config.shutdown = 1;
if (!strcasecmp(command,"monitor")) config.monitor_mode = 1;
- if (!strcasecmp(command,"subscribe") ||
- !strcasecmp(command,"psubscribe") ||
- !strcasecmp(command,"ssubscribe")) config.pubsub_mode = 1;
- if (!strcasecmp(command,"unsubscribe") ||
- !strcasecmp(command,"punsubscribe") ||
- !strcasecmp(command,"sunsubscribe")) is_unsubscribe_command = 1;
+ int is_subscribe = (!strcasecmp(command, "subscribe") ||
+ !strcasecmp(command, "psubscribe") ||
+ !strcasecmp(command, "ssubscribe"));
+ int is_unsubscribe = (!strcasecmp(command, "unsubscribe") ||
+ !strcasecmp(command, "punsubscribe") ||
+ !strcasecmp(command, "sunsubscribe"));
if (!strcasecmp(command,"sync") ||
!strcasecmp(command,"psync")) config.slave_mode = 1;
@@ -1812,21 +1937,6 @@ static int cliSendCommand(int argc, char **argv, long repeat) {
while(repeat < 0 || repeat-- > 0) {
redisAppendCommandArgv(context,argc,(const char**)argv,argvlen);
- if (is_unsubscribe_command) {
- /* In unsubscribe related commands, we need to read the specified
- * number of replies according to the number of parameters. */
- argc--; /* Skip the command */
- do {
- if (cliReadReply(output_raw) != REDIS_OK) {
- cliPrintContextError();
- exit(1);
- }
- fflush(stdout);
- } while(--argc);
- zfree(argvlen);
- continue;
- }
-
if (config.monitor_mode) {
do {
if (cliReadReply(output_raw) != REDIS_OK) {
@@ -1843,27 +1953,15 @@ static int cliSendCommand(int argc, char **argv, long repeat) {
return REDIS_OK;
}
- if (config.pubsub_mode) {
- if (config.output != OUTPUT_RAW)
- printf("Reading messages... (press Ctrl-C to quit)\n");
-
+ int num_expected_pubsub_push = 0;
+ if (is_subscribe || is_unsubscribe) {
+ /* When a push callback is set, redisGetReply (hiredis) loops until
+ * an in-band message is received, but these commands are confirmed
+ * using push replies only. There is one push reply per channel if
+ * channels are specified, otherwise at least one. */
+ num_expected_pubsub_push = argc > 1 ? argc - 1 : 1;
/* Unset our default PUSH handler so this works in RESP2/RESP3 */
redisSetPushCallback(context, NULL);
-
- while (config.pubsub_mode) {
- if (cliReadReply(output_raw) != REDIS_OK) {
- cliPrintContextError();
- exit(1);
- }
- fflush(stdout); /* Make it grep friendly */
- if (!config.pubsub_mode || config.last_cmd_type == REDIS_REPLY_ERROR) {
- if (config.push_output) {
- redisSetPushCallback(context, cliPushHandler);
- }
- config.pubsub_mode = 0;
- }
- }
- continue;
}
if (config.slave_mode) {
@@ -1874,10 +1972,35 @@ static int cliSendCommand(int argc, char **argv, long repeat) {
return REDIS_ERR; /* Error = slaveMode lost connection to master */
}
- if (cliReadReply(output_raw) != REDIS_OK) {
- zfree(argvlen);
- return REDIS_ERR;
- } else {
+ /* Read response, possibly skipping pubsub/push messages. */
+ while (1) {
+ if (cliReadReply(output_raw) != REDIS_OK) {
+ zfree(argvlen);
+ return REDIS_ERR;
+ }
+ fflush(stdout);
+ if (config.pubsub_mode || num_expected_pubsub_push > 0) {
+ if (isPubsubPush(config.last_reply)) {
+ if (num_expected_pubsub_push > 0 &&
+ !strcasecmp(config.last_reply->element[0]->str, command))
+ {
+ /* This pushed message confirms the
+ * [p|s][un]subscribe command. */
+ if (is_subscribe && !config.pubsub_mode) {
+ config.pubsub_mode = 1;
+ cliRefreshPrompt();
+ }
+ if (--num_expected_pubsub_push > 0) {
+ continue; /* We need more of these. */
+ }
+ } else {
+ continue; /* Skip this pubsub message. */
+ }
+ } else if (config.last_reply->type == REDIS_REPLY_PUSH) {
+ continue; /* Skip other push message. */
+ }
+ }
+
/* Store database number when SELECT was successfully executed. */
if (!strcasecmp(command,"select") && argc == 2 &&
config.last_cmd_type != REDIS_REPLY_ERROR)
@@ -1911,9 +2034,25 @@ static int cliSendCommand(int argc, char **argv, long repeat) {
config.in_multi = 0;
config.dbnum = 0;
config.conn_info.input_dbnum = 0;
- config.resp3 = 0;
+ config.current_resp3 = 0;
+ if (config.pubsub_mode && config.push_output) {
+ redisSetPushCallback(context, cliPushHandler);
+ }
+ config.pubsub_mode = 0;
cliRefreshPrompt();
+ } else if (!strcasecmp(command,"hello")) {
+ if (config.last_cmd_type == REDIS_REPLY_MAP) {
+ config.current_resp3 = 1;
+ } else if (config.last_cmd_type == REDIS_REPLY_ARRAY) {
+ config.current_resp3 = 0;
+ }
+ } else if ((is_subscribe || is_unsubscribe) && !config.pubsub_mode) {
+ /* We didn't enter pubsub mode. Restore push callback. */
+ if (config.push_output)
+ redisSetPushCallback(context, cliPushHandler);
}
+
+ break;
}
if (config.cluster_reissue_command){
/* If we need to reissue the command, break to prevent a
@@ -2664,8 +2803,17 @@ static void repl(void) {
}
cliRefreshPrompt();
- while((line = linenoise(context ? config.prompt : "not connected> ")) != NULL) {
- if (line[0] != '\0') {
+ while(1) {
+ line = linenoise(context ? config.prompt : "not connected> ");
+ if (line == NULL) {
+ /* ^C, ^D or similar. */
+ if (config.pubsub_mode) {
+ config.pubsub_mode = 0;
+ if (cliConnect(CC_FORCE) == REDIS_OK)
+ continue;
+ }
+ break;
+ } else if (line[0] != '\0') {
long repeat = 1;
int skipargs = 0;
char *endptr = NULL;
@@ -2759,6 +2907,11 @@ static void repl(void) {
/* Free the argument vector */
sdsfreesplitres(argv,argc);
}
+
+ if (config.pubsub_mode) {
+ cliWaitForMessagesOrStdin();
+ }
+
/* linenoise() returns malloc-ed lines like readline() */
linenoiseFree(line);
}
@@ -2799,6 +2952,13 @@ static int noninteractive(int argc, char **argv) {
retval = issueCommand(argc, sds_args);
sdsfreesplitres(sds_args, argc);
+ while (config.pubsub_mode) {
+ if (cliReadReply(0) != REDIS_OK) {
+ cliPrintContextError();
+ exit(1);
+ }
+ fflush(stdout);
+ }
return retval == REDIS_OK ? 0 : 1;
}
@@ -9011,6 +9171,7 @@ int main(int argc, char **argv) {
config.eval_ldb_sync = 0;
config.enable_ldb_on_eval = 0;
config.last_cmd_type = -1;
+ config.last_reply = NULL;
config.verbose = 0;
config.set_errcode = 0;
config.no_auth_warning = 0;