summaryrefslogtreecommitdiff
path: root/src/redis-cli.c
diff options
context:
space:
mode:
authorBinbin <binloveplay1314@qq.com>2023-01-04 17:13:22 +0800
committerGitHub <noreply@github.com>2023-01-04 11:13:22 +0200
commit4ef4c4a686b6edaf825635d942b698349f67bcc6 (patch)
tree3be39ccccb992882870d9fb113169d684cef2a46 /src/redis-cli.c
parentc8052122a2af9b85124a8697488a0c44d66777b8 (diff)
downloadredis-4ef4c4a686b6edaf825635d942b698349f67bcc6.tar.gz
Make redis-cli support PSYNC command (#11647)
The current redis-cli does not support the real PSYNC command, the older version of redis-cli can support PSYNC is because that we actually issue the SYNC command instead of PSYNC, so it act like SYNC (always full-sync). Noted that in this case we will send the SYNC first (triggered by sendSync), then send the PSYNC (the one in redis-cli input). Didn't bother to find which version that the order changed, we send PSYNC first (the one in redis-cli input), and then send the SYNC (the one triggered by sendSync). So even full-sync is not working anymore, and it will result this output (mentioned in issue #11246): ``` psync dummy 0 Entering replica output mode... (press Ctrl-C to quit) SYNC with master, discarding bytes of bulk transfer until EOF marker... Error reading RDB payload while SYNCing ``` This PR adds PSYNC support to redis-cli, which can handle +FULLRESYNC and +CONTINUE responses, and some examples will follow. Co-authored-by: Oran Agra <oran@redislabs.com>
Diffstat (limited to 'src/redis-cli.c')
-rw-r--r--src/redis-cli.c96
1 files changed, 74 insertions, 22 deletions
diff --git a/src/redis-cli.c b/src/redis-cli.c
index 147231540..c8576702c 100644
--- a/src/redis-cli.c
+++ b/src/redis-cli.c
@@ -284,7 +284,7 @@ static struct pref {
static volatile sig_atomic_t force_cancel_loop = 0;
static void usage(int err);
-static void slaveMode(void);
+static void slaveMode(int send_sync);
char *redisGitSHA1(void);
char *redisGitDirty(void);
static int cliConnect(int flags);
@@ -1848,7 +1848,7 @@ static int cliSendCommand(int argc, char **argv, long repeat) {
if (config.slave_mode) {
printf("Entering replica output mode... (press Ctrl-C to quit)\n");
- slaveMode();
+ slaveMode(0);
config.slave_mode = 0;
zfree(argvlen);
return REDIS_ERR; /* Error = slaveMode lost connection to master */
@@ -7695,8 +7695,15 @@ static ssize_t readConn(redisContext *c, char *buf, size_t len)
/* Sends SYNC and reads the number of bytes in the payload. Used both by
* slaveMode() and getRDB().
- * returns 0 in case an EOF marker is used. */
-unsigned long long sendSync(redisContext *c, char *out_eof) {
+ *
+ * send_sync if 1 means we will explicitly send SYNC command. If 0 means
+ * we will not send SYNC command, will send the command that in c->obuf.
+ *
+ * Returns the size of the RDB payload to read, or 0 in case an EOF marker is used and the size
+ * is unknown, also returns 0 in case a PSYNC +CONTINUE was found (no RDB payload).
+ *
+ * The out_full_mode parameter if 1 means this is a full sync, if 0 means this is partial mode. */
+unsigned long long sendSync(redisContext *c, int send_sync, char *out_eof, int *out_full_mode) {
/* To start we need to send the SYNC command and return the payload.
* The hiredis client lib does not understand this part of the protocol
* and we don't want to mess with its buffers, so everything is performed
@@ -7704,10 +7711,20 @@ unsigned long long sendSync(redisContext *c, char *out_eof) {
char buf[4096], *p;
ssize_t nread;
- /* Send the SYNC command. */
- if (cliWriteConn(c, "SYNC\r\n", 6) != 6) {
- fprintf(stderr,"Error writing to master\n");
- exit(1);
+ if (out_full_mode) *out_full_mode = 1;
+
+ if (send_sync) {
+ /* Send the SYNC command. */
+ if (cliWriteConn(c, "SYNC\r\n", 6) != 6) {
+ fprintf(stderr,"Error writing to master\n");
+ exit(1);
+ }
+ } else {
+ /* We have written the command into c->obuf before. */
+ if (cliWriteConn(c, "", 0) != 0) {
+ fprintf(stderr,"Error writing to master\n");
+ exit(1);
+ }
}
/* Read $<payload>\r\n, making sure to read just up to "\n" */
@@ -7720,12 +7737,41 @@ unsigned long long sendSync(redisContext *c, char *out_eof) {
}
if (*p == '\n' && p != buf) break;
if (*p != '\n') p++;
+ if (p >= buf + sizeof(buf) - 1) break; /* Go back one more char for null-term. */
}
*p = '\0';
if (buf[0] == '-') {
fprintf(stderr, "SYNC with master failed: %s\n", buf);
exit(1);
}
+
+ /* Handling PSYNC responses.
+ * Read +FULLRESYNC <replid> <offset>\r\n, after that is the $<payload> or the $EOF:<40 bytes delimiter>
+ * Read +CONTINUE <replid>\r\n or +CONTINUE\r\n, after that is the command stream */
+ if (!strncmp(buf, "+FULLRESYNC", 11) ||
+ !strncmp(buf, "+CONTINUE", 9))
+ {
+ int sync_partial = !strncmp(buf, "+CONTINUE", 9);
+ fprintf(stderr, "PSYNC replied %s\n", buf);
+ p = buf;
+ while(1) {
+ nread = readConn(c,p,1);
+ if (nread <= 0) {
+ fprintf(stderr,"Error reading bulk length while PSYNCing\n");
+ exit(1);
+ }
+ if (*p == '\n' && p != buf) break;
+ if (*p != '\n') p++;
+ if (p >= buf + sizeof(buf) - 1) break; /* Go back one more char for null-term. */
+ }
+ *p = '\0';
+
+ if (sync_partial) {
+ if (out_full_mode) *out_full_mode = 0;
+ return 0;
+ }
+ }
+
if (strncmp(buf+1,"EOF:",4) == 0 && strlen(buf+5) >= RDB_EOF_MARK_SIZE) {
memcpy(out_eof, buf+5, RDB_EOF_MARK_SIZE);
return 0;
@@ -7733,33 +7779,39 @@ unsigned long long sendSync(redisContext *c, char *out_eof) {
return strtoull(buf+1,NULL,10);
}
-static void slaveMode(void) {
+static void slaveMode(int send_sync) {
static char eofmark[RDB_EOF_MARK_SIZE];
static char lastbytes[RDB_EOF_MARK_SIZE];
static int usemark = 0;
- unsigned long long payload = sendSync(context,eofmark);
+ static int out_full_mode;
+ unsigned long long payload = sendSync(context, send_sync, eofmark, &out_full_mode);
char buf[1024];
int original_output = config.output;
+ char *info = out_full_mode ? "Full resync" : "Partial resync";
- if (payload == 0) {
+ if (out_full_mode == 1 && payload == 0) {
+ /* SYNC with EOF marker or PSYNC +FULLRESYNC with EOF marker. */
payload = ULLONG_MAX;
memset(lastbytes,0,RDB_EOF_MARK_SIZE);
usemark = 1;
- fprintf(stderr,"SYNC with master, discarding "
- "bytes of bulk transfer until EOF marker...\n");
- } else {
- fprintf(stderr,"SYNC with master, discarding %llu "
- "bytes of bulk transfer...\n", payload);
+ fprintf(stderr, "%s with master, discarding "
+ "bytes of bulk transfer until EOF marker...\n", info);
+ } else if (out_full_mode == 1 && payload != 0) {
+ /* SYNC without EOF marker or PSYNC +FULLRESYNC. */
+ fprintf(stderr, "%s with master, discarding %llu "
+ "bytes of bulk transfer...\n", info, payload);
+ } else if (out_full_mode == 0 && payload == 0) {
+ /* PSYNC +CONTINUE (no RDB payload). */
+ fprintf(stderr, "%s with master...\n", info);
}
-
/* Discard the payload. */
while(payload) {
ssize_t nread;
nread = readConn(context,buf,(payload > sizeof(buf)) ? sizeof(buf) : payload);
if (nread <= 0) {
- fprintf(stderr,"Error reading RDB payload while SYNCing\n");
+ fprintf(stderr,"Error reading RDB payload while %sing\n", info);
exit(1);
}
payload -= nread;
@@ -7780,12 +7832,12 @@ static void slaveMode(void) {
if (usemark) {
unsigned long long offset = ULLONG_MAX - payload;
- fprintf(stderr,"SYNC done after %llu bytes. Logging commands from master.\n", offset);
+ fprintf(stderr,"%s done after %llu bytes. Logging commands from master.\n", info, offset);
/* put the slave online */
sleep(1);
sendReplconf("ACK", "0");
} else
- fprintf(stderr,"SYNC done. Logging commands from master.\n");
+ fprintf(stderr,"%s done. Logging commands from master.\n", info);
/* Now we can use hiredis to read the incoming protocol. */
config.output = OUTPUT_CSV;
@@ -7814,7 +7866,7 @@ static void getRDB(clusterManagerNode *node) {
static char eofmark[RDB_EOF_MARK_SIZE];
static char lastbytes[RDB_EOF_MARK_SIZE];
static int usemark = 0;
- unsigned long long payload = sendSync(s, eofmark);
+ unsigned long long payload = sendSync(s, 1, eofmark, NULL);
char buf[4096];
if (payload == 0) {
@@ -9027,7 +9079,7 @@ int main(int argc, char **argv) {
if (cliConnect(0) == REDIS_ERR) exit(1);
sendCapa();
sendReplconf("rdb-filter-only", "");
- slaveMode();
+ slaveMode(1);
}
/* Get RDB/functions mode. */