summaryrefslogtreecommitdiff
path: root/src/cluster.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/cluster.c')
-rw-r--r--src/cluster.c59
1 files changed, 33 insertions, 26 deletions
diff --git a/src/cluster.c b/src/cluster.c
index c85e3791d..505c3c757 100644
--- a/src/cluster.c
+++ b/src/cluster.c
@@ -4776,7 +4776,7 @@ NULL
/* Generates a DUMP-format representation of the object 'o', adding it to the
* io stream pointed by 'rio'. This function can't fail. */
-void createDumpPayload(rio *payload, robj *o, robj *key) {
+void createDumpPayload(rio *payload, robj *o, rkey *key) {
unsigned char buf[2];
uint64_t crc;
@@ -4833,16 +4833,17 @@ int verifyDumpPayload(unsigned char *p, size_t len) {
* complement of RESTORE and can be useful for different applications. */
void dumpCommand(client *c) {
robj *o, *dumpobj;
+ rkey *key;
rio payload;
/* Check if the key is here. */
- if ((o = lookupKeyRead(c->db,c->argv[1])) == NULL) {
+ if ((o = lookupKeyRead(c->db,c->argv[1],&key)) == NULL) {
addReplyNull(c);
return;
}
/* Create the DUMP encoded representation. */
- createDumpPayload(&payload,o,c->argv[1]);
+ createDumpPayload(&payload,o,key);
/* Transfer to the client */
dumpobj = createObject(OBJ_STRING,payload.io.buffer.ptr);
@@ -4893,7 +4894,7 @@ void restoreCommand(client *c) {
}
/* Make sure this key does not already exist here... */
- if (!replace && lookupKeyWrite(c->db,c->argv[1]) != NULL) {
+ if (!replace && lookupKeyWrite(c->db,c->argv[1],NULL) != NULL) {
addReply(c,shared.busykeyerr);
return;
}
@@ -4925,12 +4926,12 @@ void restoreCommand(client *c) {
if (replace) dbDelete(c->db,c->argv[1]);
/* Create the key and set the TTL if any */
- dbAdd(c->db,c->argv[1],obj);
+ rkey *key = dbAdd(c->db,c->argv[1],obj);
if (ttl) {
if (!absttl) ttl+=mstime();
- setExpire(c,c->db,c->argv[1],ttl);
+ setExpire(c,c->db,key,ttl);
}
- objectSetLRUOrLFU(obj,lfu_freq,lru_idle,lru_clock);
+ objectSetLRUOrLFU(key,lfu_freq,lru_idle,lru_clock);
signalModifiedKey(c->db,c->argv[1]);
addReply(c,shared.ok);
server.dirty++;
@@ -5066,7 +5067,8 @@ void migrateCommand(client *c) {
long timeout;
long dbid;
robj **ov = NULL; /* Objects to migrate. */
- robj **kv = NULL; /* Key names. */
+ robj **knv = NULL; /* Key names. */
+ rkey **kov = NULL; /* Key objects. */
robj **newargv = NULL; /* Used to rewrite the command as DEL ... keys ... */
rio cmd, payload;
int may_retry = 1;
@@ -5121,18 +5123,21 @@ void migrateCommand(client *c) {
* this case, since often this is due to a normal condition like the key
* expiring in the meantime. */
ov = zrealloc(ov,sizeof(robj*)*num_keys);
- kv = zrealloc(kv,sizeof(robj*)*num_keys);
+ knv = zrealloc(knv,sizeof(robj*)*num_keys);
+ kov = zrealloc(kov,sizeof(rkey*)*num_keys);
int oi = 0;
for (j = 0; j < num_keys; j++) {
- if ((ov[oi] = lookupKeyRead(c->db,c->argv[first_key+j])) != NULL) {
- kv[oi] = c->argv[first_key+j];
+ rkey *ko;
+ if ((ov[oi] = lookupKeyRead(c->db,c->argv[first_key+j],&ko)) != NULL) {
+ knv[oi] = c->argv[first_key+j];
+ kov[oi] = ko;
oi++;
}
}
num_keys = oi;
if (num_keys == 0) {
- zfree(ov); zfree(kv);
+ zfree(ov); zfree(knv); zfree(kov);
addReplySds(c,sdsnew("+NOKEY\r\n"));
return;
}
@@ -5143,7 +5148,7 @@ try_again:
/* Connect */
cs = migrateGetSocket(c,c->argv[1],c->argv[2],timeout);
if (cs == NULL) {
- zfree(ov); zfree(kv);
+ zfree(ov); zfree(knv); zfree(kov);
return; /* error sent to the client by migrateGetSocket() */
}
@@ -5173,7 +5178,7 @@ try_again:
/* Create RESTORE payload and generate the protocol to call the command. */
for (j = 0; j < num_keys; j++) {
long long ttl = 0;
- long long expireat = getExpire(c->db,kv[j]);
+ long long expireat = getExpire(kov[j]);
if (expireat != -1) {
ttl = expireat-mstime();
@@ -5186,7 +5191,9 @@ try_again:
/* Relocate valid (non expired) keys into the array in successive
* positions to remove holes created by the keys that were present
* in the first lookup but are now expired after the second lookup. */
- kv[non_expired++] = kv[j];
+ knv[non_expired] = knv[j];
+ kov[non_expired] = kov[j];
+ non_expired++;
serverAssertWithInfo(c,NULL,
rioWriteBulkCount(&cmd,'*',replace ? 5 : 4));
@@ -5196,14 +5203,14 @@ try_again:
rioWriteBulkString(&cmd,"RESTORE-ASKING",14));
else
serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"RESTORE",7));
- serverAssertWithInfo(c,NULL,sdsEncodedObject(kv[j]));
- serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,kv[j]->ptr,
- sdslen(kv[j]->ptr)));
+ serverAssertWithInfo(c,NULL,sdsEncodedObject(knv[j]));
+ serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,knv[j]->ptr,
+ sdslen(knv[j]->ptr)));
serverAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,ttl));
/* Emit the payload argument, that is the serialized object using
* the DUMP format. */
- createDumpPayload(&payload,ov[j],kv[j]);
+ createDumpPayload(&payload,ov[j],kov[j]);
serverAssertWithInfo(c,NULL,
rioWriteBulkString(&cmd,payload.io.buffer.ptr,
sdslen(payload.io.buffer.ptr)));
@@ -5283,13 +5290,13 @@ try_again:
} else {
if (!copy) {
/* No COPY option: remove the local key, signal the change. */
- dbDelete(c->db,kv[j]);
- signalModifiedKey(c->db,kv[j]);
+ dbDelete(c->db,knv[j]);
+ signalModifiedKey(c->db,knv[j]);
server.dirty++;
/* Populate the argument vector to replace the old one. */
- newargv[del_idx++] = kv[j];
- incrRefCount(kv[j]);
+ newargv[del_idx++] = knv[j];
+ incrRefCount(knv[j]);
}
}
}
@@ -5347,7 +5354,7 @@ try_again:
}
sdsfree(cmd.io.buffer.ptr);
- zfree(ov); zfree(kv); zfree(newargv);
+ zfree(ov); zfree(knv); zfree(kov); zfree(newargv);
return;
/* On socket errors we try to close the cached socket and try again.
@@ -5374,7 +5381,7 @@ socket_err:
}
/* Cleanup we want to do if no retry is attempted. */
- zfree(ov); zfree(kv);
+ zfree(ov); zfree(knv); zfree(kov);
addReplySds(c,
sdscatprintf(sdsempty(),
"-IOERR error or timeout %s to target instance\r\n",
@@ -5554,7 +5561,7 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in
/* Migarting / Improrting slot? Count keys we don't have. */
if ((migrating_slot || importing_slot) &&
- lookupKeyRead(&server.db[0],thiskey) == NULL)
+ lookupKeyRead(&server.db[0],thiskey,NULL) == NULL)
{
missing_keys++;
}