diff options
Diffstat (limited to 'src/cluster.c')
-rw-r--r-- | src/cluster.c | 59 |
1 files changed, 33 insertions, 26 deletions
diff --git a/src/cluster.c b/src/cluster.c index c85e3791d..505c3c757 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -4776,7 +4776,7 @@ NULL /* Generates a DUMP-format representation of the object 'o', adding it to the * io stream pointed by 'rio'. This function can't fail. */ -void createDumpPayload(rio *payload, robj *o, robj *key) { +void createDumpPayload(rio *payload, robj *o, rkey *key) { unsigned char buf[2]; uint64_t crc; @@ -4833,16 +4833,17 @@ int verifyDumpPayload(unsigned char *p, size_t len) { * complement of RESTORE and can be useful for different applications. */ void dumpCommand(client *c) { robj *o, *dumpobj; + rkey *key; rio payload; /* Check if the key is here. */ - if ((o = lookupKeyRead(c->db,c->argv[1])) == NULL) { + if ((o = lookupKeyRead(c->db,c->argv[1],&key)) == NULL) { addReplyNull(c); return; } /* Create the DUMP encoded representation. */ - createDumpPayload(&payload,o,c->argv[1]); + createDumpPayload(&payload,o,key); /* Transfer to the client */ dumpobj = createObject(OBJ_STRING,payload.io.buffer.ptr); @@ -4893,7 +4894,7 @@ void restoreCommand(client *c) { } /* Make sure this key does not already exist here... */ - if (!replace && lookupKeyWrite(c->db,c->argv[1]) != NULL) { + if (!replace && lookupKeyWrite(c->db,c->argv[1],NULL) != NULL) { addReply(c,shared.busykeyerr); return; } @@ -4925,12 +4926,12 @@ void restoreCommand(client *c) { if (replace) dbDelete(c->db,c->argv[1]); /* Create the key and set the TTL if any */ - dbAdd(c->db,c->argv[1],obj); + rkey *key = dbAdd(c->db,c->argv[1],obj); if (ttl) { if (!absttl) ttl+=mstime(); - setExpire(c,c->db,c->argv[1],ttl); + setExpire(c,c->db,key,ttl); } - objectSetLRUOrLFU(obj,lfu_freq,lru_idle,lru_clock); + objectSetLRUOrLFU(key,lfu_freq,lru_idle,lru_clock); signalModifiedKey(c->db,c->argv[1]); addReply(c,shared.ok); server.dirty++; @@ -5066,7 +5067,8 @@ void migrateCommand(client *c) { long timeout; long dbid; robj **ov = NULL; /* Objects to migrate. */ - robj **kv = NULL; /* Key names. */ + robj **knv = NULL; /* Key names. */ + rkey **kov = NULL; /* Key objects. */ robj **newargv = NULL; /* Used to rewrite the command as DEL ... keys ... */ rio cmd, payload; int may_retry = 1; @@ -5121,18 +5123,21 @@ void migrateCommand(client *c) { * this case, since often this is due to a normal condition like the key * expiring in the meantime. */ ov = zrealloc(ov,sizeof(robj*)*num_keys); - kv = zrealloc(kv,sizeof(robj*)*num_keys); + knv = zrealloc(knv,sizeof(robj*)*num_keys); + kov = zrealloc(kov,sizeof(rkey*)*num_keys); int oi = 0; for (j = 0; j < num_keys; j++) { - if ((ov[oi] = lookupKeyRead(c->db,c->argv[first_key+j])) != NULL) { - kv[oi] = c->argv[first_key+j]; + rkey *ko; + if ((ov[oi] = lookupKeyRead(c->db,c->argv[first_key+j],&ko)) != NULL) { + knv[oi] = c->argv[first_key+j]; + kov[oi] = ko; oi++; } } num_keys = oi; if (num_keys == 0) { - zfree(ov); zfree(kv); + zfree(ov); zfree(knv); zfree(kov); addReplySds(c,sdsnew("+NOKEY\r\n")); return; } @@ -5143,7 +5148,7 @@ try_again: /* Connect */ cs = migrateGetSocket(c,c->argv[1],c->argv[2],timeout); if (cs == NULL) { - zfree(ov); zfree(kv); + zfree(ov); zfree(knv); zfree(kov); return; /* error sent to the client by migrateGetSocket() */ } @@ -5173,7 +5178,7 @@ try_again: /* Create RESTORE payload and generate the protocol to call the command. */ for (j = 0; j < num_keys; j++) { long long ttl = 0; - long long expireat = getExpire(c->db,kv[j]); + long long expireat = getExpire(kov[j]); if (expireat != -1) { ttl = expireat-mstime(); @@ -5186,7 +5191,9 @@ try_again: /* Relocate valid (non expired) keys into the array in successive * positions to remove holes created by the keys that were present * in the first lookup but are now expired after the second lookup. */ - kv[non_expired++] = kv[j]; + knv[non_expired] = knv[j]; + kov[non_expired] = kov[j]; + non_expired++; serverAssertWithInfo(c,NULL, rioWriteBulkCount(&cmd,'*',replace ? 5 : 4)); @@ -5196,14 +5203,14 @@ try_again: rioWriteBulkString(&cmd,"RESTORE-ASKING",14)); else serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"RESTORE",7)); - serverAssertWithInfo(c,NULL,sdsEncodedObject(kv[j])); - serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,kv[j]->ptr, - sdslen(kv[j]->ptr))); + serverAssertWithInfo(c,NULL,sdsEncodedObject(knv[j])); + serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,knv[j]->ptr, + sdslen(knv[j]->ptr))); serverAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,ttl)); /* Emit the payload argument, that is the serialized object using * the DUMP format. */ - createDumpPayload(&payload,ov[j],kv[j]); + createDumpPayload(&payload,ov[j],kov[j]); serverAssertWithInfo(c,NULL, rioWriteBulkString(&cmd,payload.io.buffer.ptr, sdslen(payload.io.buffer.ptr))); @@ -5283,13 +5290,13 @@ try_again: } else { if (!copy) { /* No COPY option: remove the local key, signal the change. */ - dbDelete(c->db,kv[j]); - signalModifiedKey(c->db,kv[j]); + dbDelete(c->db,knv[j]); + signalModifiedKey(c->db,knv[j]); server.dirty++; /* Populate the argument vector to replace the old one. */ - newargv[del_idx++] = kv[j]; - incrRefCount(kv[j]); + newargv[del_idx++] = knv[j]; + incrRefCount(knv[j]); } } } @@ -5347,7 +5354,7 @@ try_again: } sdsfree(cmd.io.buffer.ptr); - zfree(ov); zfree(kv); zfree(newargv); + zfree(ov); zfree(knv); zfree(kov); zfree(newargv); return; /* On socket errors we try to close the cached socket and try again. @@ -5374,7 +5381,7 @@ socket_err: } /* Cleanup we want to do if no retry is attempted. */ - zfree(ov); zfree(kv); + zfree(ov); zfree(knv); zfree(kov); addReplySds(c, sdscatprintf(sdsempty(), "-IOERR error or timeout %s to target instance\r\n", @@ -5554,7 +5561,7 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in /* Migarting / Improrting slot? Count keys we don't have. */ if ((migrating_slot || importing_slot) && - lookupKeyRead(&server.db[0],thiskey) == NULL) + lookupKeyRead(&server.db[0],thiskey,NULL) == NULL) { missing_keys++; } |