summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorantirez <antirez@gmail.com>2015-12-11 13:48:41 +0100
committerantirez <antirez@gmail.com>2015-12-13 10:17:26 +0100
commitc460458d5766fdfd06dee5cb05f886681813209d (patch)
treeae5fb8b3f61f0a15f5cf4811faecd4cfa39cdf64
parent5912afc9d4aa85f2f66f7f318a22fe4f1b4be9c3 (diff)
downloadredis-c460458d5766fdfd06dee5cb05f886681813209d.tar.gz
MIGRATE: fix replies processing and argument rewriting.
We need to process replies after errors in order to delete keys successfully transferred. Also argument rewriting was fixed since it was broken in several ways. Now a fresh argument vector is created and set if we are acknowledged of at least one key.
-rw-r--r--src/cluster.c36
-rw-r--r--src/networking.c19
2 files changed, 40 insertions, 15 deletions
diff --git a/src/cluster.c b/src/cluster.c
index 3783efa70..c49fc32f3 100644
--- a/src/cluster.c
+++ b/src/cluster.c
@@ -4742,16 +4742,22 @@ try_again:
/* Read the RESTORE replies. */
int error_from_target = 0;
+ int del_idx = 1; /* Index of the key argument for the replicated DEL op. */
+ robj **newargv;
+
+ if (!copy) newargv = zmalloc(sizeof(robj*)*(num_keys+1));
+
for (j = 0; j < num_keys; j++) {
if (syncReadLine(cs->fd, buf2, sizeof(buf2), timeout) <= 0)
goto socket_err;
- if (error_from_target) continue; /* Just consume the replies. */
if ((select && buf1[0] == '-') || buf2[0] == '-') {
/* On error assume that last_dbid is no longer valid. */
- cs->last_dbid = -1;
- addReplyErrorFormat(c,"Target instance replied with error: %s",
- (select && buf1[0] == '-') ? buf1+1 : buf2+1);
- error_from_target = 1;
+ if (!error_from_target) {
+ cs->last_dbid = -1;
+ addReplyErrorFormat(c,"Target instance replied with error: %s",
+ (select && buf1[0] == '-') ? buf1+1 : buf2+1);
+ error_from_target = 1;
+ }
} else {
if (!copy) {
robj *aux;
@@ -4761,17 +4767,23 @@ try_again:
signalModifiedKey(c->db,kv[j]);
server.dirty++;
- /* Translate MIGRATE as DEL for replication/AOF. */
- if (j == 0) {
- aux = createStringObject("DEL",3);
- rewriteClientCommandArgument(c,0,aux);
- decrRefCount(aux);
- }
- rewriteClientCommandArgument(c,j+1,kv[j]);
+ /* Populate the argument vector to replace the old one. */
+ newargv[del_idx++] = kv[j];
}
}
}
+ if (!copy) {
+ /* Translate MIGRATE as DEL for replication/AOF. */
+ if (del_idx > 1) {
+ newargv[0] = createStringObject("DEL",3);
+ replaceClientCommandVector(c,newargv,del_idx);
+ } else {
+ /* No key transfer acknowledged, no need to rewrite as DEL. */
+ zfree(newargv);
+ }
+ }
+
if (!error_from_target) {
/* Update the last_dbid in migrateCachedSocket and reply +OK. */
cs->last_dbid = dbid;
diff --git a/src/networking.c b/src/networking.c
index eab213060..acffd5371 100644
--- a/src/networking.c
+++ b/src/networking.c
@@ -1528,15 +1528,28 @@ void rewriteClientCommandVector(redisClient *c, int argc, ...) {
}
/* Rewrite a single item in the command vector.
- * The new val ref count is incremented, and the old decremented. */
+ * The new val ref count is incremented, and the old decremented.
+ *
+ * It is possible to specify an argument over the current size of the
+ * argument vector: in this case the array of objects gets reallocated
+ * and c->argc set to the max value. However it's up to the caller to
+ *
+ * 1. Make sure there are no "holes" and all the arguments are set.
+ * 2. If the original argument vector was longer than the one we
+ * want to end with, it's up to the caller to set c->argc and
+ * free the no longer used objects on c->argv. */
void rewriteClientCommandArgument(redisClient *c, int i, robj *newval) {
robj *oldval;
- if (i >= c->argc) c->argv = zrealloc(c->argv,sizeof(robj*)*(i+1));
+ if (i >= c->argc) {
+ c->argv = zrealloc(c->argv,sizeof(robj*)*(i+1));
+ c->argc = i+1;
+ c->argv[i] = NULL;
+ }
oldval = c->argv[i];
c->argv[i] = newval;
incrRefCount(newval);
- decrRefCount(oldval);
+ if (oldval) decrRefCount(oldval);
/* If this is the command name make sure to fix c->cmd. */
if (i == 0) {