summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorantirez <antirez@gmail.com>2015-08-04 09:20:55 +0200
committerantirez <antirez@gmail.com>2015-10-01 13:02:24 +0200
commita7c5be18a81c120b4bdeb139072f27c899fe1a4d (patch)
treec7cf812a74ee27d575607a116b6c3fb4cbf91146
parent86d48efbfd40cba447025c36ac6b0c5507b032fd (diff)
downloadredis-a7c5be18a81c120b4bdeb139072f27c899fe1a4d.tar.gz
Lazyfree: Sorted sets convereted to plain SDS. (several commits squashed)
-rw-r--r--src/aof.c8
-rw-r--r--src/cluster.c5
-rw-r--r--src/db.c24
-rw-r--r--src/debug.c9
-rw-r--r--src/geo.c20
-rw-r--r--src/rdb.c27
-rw-r--r--src/server.c10
-rw-r--r--src/server.h17
-rw-r--r--src/sort.c10
-rw-r--r--src/t_zset.c442
10 files changed, 306 insertions, 266 deletions
diff --git a/src/aof.c b/src/aof.c
index 7a80a9355..3ab6c85b2 100644
--- a/src/aof.c
+++ b/src/aof.c
@@ -826,7 +826,7 @@ int rewriteSetObject(rio *r, robj *key, robj *o) {
dictEntry *de;
while((de = dictNext(di)) != NULL) {
- robj *eleobj = dictGetKey(de);
+ sds ele = dictGetKey(de);
if (count == 0) {
int cmd_items = (items > AOF_REWRITE_ITEMS_PER_CMD) ?
AOF_REWRITE_ITEMS_PER_CMD : items;
@@ -835,7 +835,7 @@ int rewriteSetObject(rio *r, robj *key, robj *o) {
if (rioWriteBulkString(r,"SADD",4) == 0) return 0;
if (rioWriteBulkObject(r,key) == 0) return 0;
}
- if (rioWriteBulkObject(r,eleobj) == 0) return 0;
+ if (rioWriteBulkString(r,ele,sdslen(ele)) == 0) return 0;
if (++count == AOF_REWRITE_ITEMS_PER_CMD) count = 0;
items--;
}
@@ -892,7 +892,7 @@ int rewriteSortedSetObject(rio *r, robj *key, robj *o) {
dictEntry *de;
while((de = dictNext(di)) != NULL) {
- robj *eleobj = dictGetKey(de);
+ sds ele = dictGetKey(de);
double *score = dictGetVal(de);
if (count == 0) {
@@ -904,7 +904,7 @@ int rewriteSortedSetObject(rio *r, robj *key, robj *o) {
if (rioWriteBulkObject(r,key) == 0) return 0;
}
if (rioWriteBulkDouble(r,*score) == 0) return 0;
- if (rioWriteBulkObject(r,eleobj) == 0) return 0;
+ if (rioWriteBulkString(r,ele,sdslen(ele)) == 0) return 0;
if (++count == AOF_REWRITE_ITEMS_PER_CMD) count = 0;
items--;
}
diff --git a/src/cluster.c b/src/cluster.c
index 4c87a456f..6fdc22175 100644
--- a/src/cluster.c
+++ b/src/cluster.c
@@ -4087,7 +4087,10 @@ void clusterCommand(client *c) {
keys = zmalloc(sizeof(robj*)*maxkeys);
numkeys = getKeysInSlot(slot, keys, maxkeys);
addReplyMultiBulkLen(c,numkeys);
- for (j = 0; j < numkeys; j++) addReplyBulk(c,keys[j]);
+ for (j = 0; j < numkeys; j++) {
+ addReplyBulk(c,keys[j]);
+ decrRefCount(keys[j]);
+ }
zfree(keys);
} else if (!strcasecmp(c->argv[1]->ptr,"forget") && c->argc == 3) {
/* CLUSTER FORGET <NODE ID> */
diff --git a/src/db.c b/src/db.c
index 92a590822..aa7be75ee 100644
--- a/src/db.c
+++ b/src/db.c
@@ -423,8 +423,8 @@ void scanCallback(void *privdata, const dictEntry *de) {
val = dictGetVal(de);
incrRefCount(val);
} else if (o->type == OBJ_ZSET) {
- key = dictGetKey(de);
- incrRefCount(key);
+ sds keysds = dictGetKey(de);
+ key = createStringObject(keysds,sdslen(keysds));
val = createStringObjectFromLongDouble(*(double*)dictGetVal(de),0);
} else {
serverPanic("Type not handled in SCAN callback.");
@@ -1181,14 +1181,13 @@ int *sortGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys)
void slotToKeyAdd(robj *key) {
unsigned int hashslot = keyHashSlot(key->ptr,sdslen(key->ptr));
- zslInsert(server.cluster->slots_to_keys,hashslot,key);
- incrRefCount(key);
+ sds sdskey = sdsdup(key->ptr);
+ zslInsert(server.cluster->slots_to_keys,hashslot,sdskey);
}
void slotToKeyDel(robj *key) {
unsigned int hashslot = keyHashSlot(key->ptr,sdslen(key->ptr));
-
- zslDelete(server.cluster->slots_to_keys,hashslot,key);
+ zslDelete(server.cluster->slots_to_keys,hashslot,key->ptr,NULL);
}
void slotToKeyFlush(void) {
@@ -1196,6 +1195,9 @@ void slotToKeyFlush(void) {
server.cluster->slots_to_keys = zslCreate();
}
+/* Pupulate the specified array of objects with keys in the specified slot.
+ * New objects are returned to represent keys, it's up to the caller to
+ * decrement the reference count to release the keys names. */
unsigned int getKeysInSlot(unsigned int hashslot, robj **keys, unsigned int count) {
zskiplistNode *n;
zrangespec range;
@@ -1206,7 +1208,7 @@ unsigned int getKeysInSlot(unsigned int hashslot, robj **keys, unsigned int coun
n = zslFirstInRange(server.cluster->slots_to_keys, &range);
while(n && n->score == hashslot && count--) {
- keys[j++] = n->obj;
+ keys[j++] = createStringObject(n->ele,sdslen(n->ele));
n = n->level[0].forward;
}
return j;
@@ -1224,9 +1226,9 @@ unsigned int delKeysInSlot(unsigned int hashslot) {
n = zslFirstInRange(server.cluster->slots_to_keys, &range);
while(n && n->score == hashslot) {
- robj *key = n->obj;
+ sds sdskey = n->ele;
+ robj *key = createStringObject(sdskey,sdslen(sdskey));
n = n->level[0].forward; /* Go to the next item before freeing it. */
- incrRefCount(key); /* Protect the object while freeing it. */
dbDelete(&server.db[0],key);
decrRefCount(key);
j++;
@@ -1248,7 +1250,7 @@ unsigned int countKeysInSlot(unsigned int hashslot) {
/* Use rank of first element, if any, to determine preliminary count */
if (zn != NULL) {
- rank = zslGetRank(zsl, zn->score, zn->obj);
+ rank = zslGetRank(zsl, zn->score, zn->ele);
count = (zsl->length - (rank - 1));
/* Find last element in range */
@@ -1256,7 +1258,7 @@ unsigned int countKeysInSlot(unsigned int hashslot) {
/* Use rank of last element, if any, to determine the actual count */
if (zn != NULL) {
- rank = zslGetRank(zsl, zn->score, zn->obj);
+ rank = zslGetRank(zsl, zn->score, zn->ele);
count -= (zsl->length - rank);
}
}
diff --git a/src/debug.c b/src/debug.c
index 61c1c3c88..fb13f7984 100644
--- a/src/debug.c
+++ b/src/debug.c
@@ -163,12 +163,9 @@ void computeDatasetDigest(unsigned char *final) {
listTypeReleaseIterator(li);
} else if (o->type == OBJ_SET) {
setTypeIterator *si = setTypeInitIterator(o);
- robj *ele;
sds sdsele;
while((sdsele = setTypeNextObject(si)) != NULL) {
- ele = createObject(OBJ_STRING,sdsele);
- xorObjectDigest(digest,ele);
- decrRefCount(ele);
+ xorDigest(digest,sdsele,sdslen(sdsele));
}
setTypeReleaseIterator(si);
} else if (o->type == OBJ_ZSET) {
@@ -210,12 +207,12 @@ void computeDatasetDigest(unsigned char *final) {
dictEntry *de;
while((de = dictNext(di)) != NULL) {
- robj *eleobj = dictGetKey(de);
+ sds sdsele = dictGetKey(de);
double *score = dictGetVal(de);
snprintf(buf,sizeof(buf),"%.17g",*score);
memset(eledigest,0,20);
- mixObjectDigest(eledigest,eleobj);
+ mixDigest(eledigest,sdsele,sdslen(sdsele));
mixDigest(eledigest,buf,strlen(buf));
xorDigest(digest,eledigest,20);
}
diff --git a/src/geo.c b/src/geo.c
index ff507fe70..d0381ce5e 100644
--- a/src/geo.c
+++ b/src/geo.c
@@ -112,7 +112,7 @@ int extractLongLatOrReply(client *c, robj **argv,
int longLatFromMember(robj *zobj, robj *member, double *xy) {
double score = 0;
- if (zsetScore(zobj, member, &score) == C_ERR) return C_ERR;
+ if (zsetScore(zobj, member->ptr, &score) == C_ERR) return C_ERR;
if (!decodeGeohash(score, xy)) return C_ERR;
return C_OK;
}
@@ -261,16 +261,14 @@ int geoGetPointsInRange(robj *zobj, double min, double max, double lon, double l
}
while (ln) {
- robj *o = ln->obj;
+ sds ele = ln->ele;
/* Abort when the node is no longer in range. */
if (!zslValueLteMax(ln->score, &range))
break;
- member = (o->encoding == OBJ_ENCODING_INT) ?
- sdsfromlonglong((long)o->ptr) :
- sdsdup(o->ptr);
- if (geoAppendIfWithinRadius(ga,lon,lat,radius,ln->score,member)
- == C_ERR) sdsfree(member);
+ ele = sdsdup(ele);
+ if (geoAppendIfWithinRadius(ga,lon,lat,radius,ln->score,ele)
+ == C_ERR) sdsfree(ele);
ln = ln->level[0].forward;
}
}
@@ -606,7 +604,7 @@ void geohashCommand(client *c) {
addReplyMultiBulkLen(c,c->argc-2);
for (j = 2; j < c->argc; j++) {
double score;
- if (zsetScore(zobj, c->argv[j], &score) == C_ERR) {
+ if (zsetScore(zobj, c->argv[j]->ptr, &score) == C_ERR) {
addReply(c,shared.nullbulk);
} else {
/* The internal format we use for geocoding is a bit different
@@ -660,7 +658,7 @@ void geoposCommand(client *c) {
addReplyMultiBulkLen(c,c->argc-2);
for (j = 2; j < c->argc; j++) {
double score;
- if (zsetScore(zobj, c->argv[j], &score) == C_ERR) {
+ if (zsetScore(zobj, c->argv[j]->ptr, &score) == C_ERR) {
addReply(c,shared.nullmultibulk);
} else {
/* Decode... */
@@ -700,8 +698,8 @@ void geodistCommand(client *c) {
/* Get the scores. We need both otherwise NULL is returned. */
double score1, score2, xyxy[4];
- if (zsetScore(zobj, c->argv[2], &score1) == C_ERR ||
- zsetScore(zobj, c->argv[3], &score2) == C_ERR)
+ if (zsetScore(zobj, c->argv[2]->ptr, &score1) == C_ERR ||
+ zsetScore(zobj, c->argv[3]->ptr, &score2) == C_ERR)
{
addReply(c,shared.nullbulk);
return;
diff --git a/src/rdb.c b/src/rdb.c
index 3d98a7673..61ac354bc 100644
--- a/src/rdb.c
+++ b/src/rdb.c
@@ -622,10 +622,11 @@ ssize_t rdbSaveObject(rio *rdb, robj *o) {
nwritten += n;
while((de = dictNext(di)) != NULL) {
- robj *eleobj = dictGetKey(de);
+ sds ele = dictGetKey(de);
double *score = dictGetVal(de);
- if ((n = rdbSaveStringObject(rdb,eleobj)) == -1) return -1;
+ if ((n = rdbSaveRawString(rdb,(unsigned char*)ele,sdslen(ele)))
+ == -1) return -1;
nwritten += n;
if ((n = rdbSaveDoubleValue(rdb,*score)) == -1) return -1;
nwritten += n;
@@ -992,7 +993,7 @@ robj *rdbLoadObject(int rdbtype, rio *rdb) {
return NULL;
if (o->encoding == OBJ_ENCODING_INTSET) {
- /* Fetch integer value from element */
+ /* Fetch integer value from element. */
if (isSdsRepresentableAsLongLong(sdsele,&llval) == C_OK) {
o->ptr = intsetAdd(o->ptr,llval,NULL);
} else {
@@ -1002,7 +1003,7 @@ robj *rdbLoadObject(int rdbtype, rio *rdb) {
}
/* This will also be called when the set was just converted
- * to a regular hash table encoded set */
+ * to a regular hash table encoded set. */
if (o->encoding == OBJ_ENCODING_HT) {
dictAdd((dict*)o->ptr,sdsele,NULL);
} else {
@@ -1010,7 +1011,7 @@ robj *rdbLoadObject(int rdbtype, rio *rdb) {
}
}
} else if (rdbtype == RDB_TYPE_ZSET) {
- /* Read list/set value */
+ /* Read list/set value. */
size_t zsetlen;
size_t maxelelen = 0;
zset *zs;
@@ -1019,23 +1020,21 @@ robj *rdbLoadObject(int rdbtype, rio *rdb) {
o = createZsetObject();
zs = o->ptr;
- /* Load every single element of the list/set */
+ /* Load every single element of the sorted set. */
while(zsetlen--) {
- robj *ele;
+ sds sdsele;
double score;
zskiplistNode *znode;
- if ((ele = rdbLoadEncodedStringObject(rdb)) == NULL) return NULL;
- ele = tryObjectEncoding(ele);
+ if ((sdsele = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS)) == NULL)
+ return NULL;
if (rdbLoadDoubleValue(rdb,&score) == -1) return NULL;
/* Don't care about integer-encoded strings. */
- if (sdsEncodedObject(ele) && sdslen(ele->ptr) > maxelelen)
- maxelelen = sdslen(ele->ptr);
+ if (sdslen(sdsele) > maxelelen) maxelelen = sdslen(sdsele);
- znode = zslInsert(zs->zsl,score,ele);
- dictAdd(zs->dict,ele,&znode->score);
- incrRefCount(ele); /* added to skiplist */
+ znode = zslInsert(zs->zsl,score,sdsele);
+ dictAdd(zs->dict,sdsele,&znode->score);
}
/* Convert *after* loading, since sorted sets are not stored ordered. */
diff --git a/src/server.c b/src/server.c
index bd45e7d29..bbec02967 100644
--- a/src/server.c
+++ b/src/server.c
@@ -554,11 +554,11 @@ dictType setDictType = {
/* Sorted sets hash (note: a skiplist is used in addition to the hash table) */
dictType zsetDictType = {
- dictEncObjHash, /* hash function */
+ dictSdsHash, /* hash function */
NULL, /* key dup */
NULL, /* val dup */
- dictEncObjKeyCompare, /* key compare */
- dictObjectDestructor, /* key destructor */
+ dictSdsKeyCompare, /* key compare */
+ NULL, /* Note: SDS string shared & freed by skiplist */
NULL /* val destructor */
};
@@ -1428,8 +1428,8 @@ void createSharedObjects(void) {
* actually used for their value but as a special object meaning
* respectively the minimum possible string and the maximum possible
* string in string comparisons for the ZRANGEBYLEX command. */
- shared.minstring = createStringObject("minstring",9);
- shared.maxstring = createStringObject("maxstring",9);
+ shared.minstring = sdsnew("minstring");
+ shared.maxstring = sdsnew("maxstring");
}
void initServerConfig(void) {
diff --git a/src/server.h b/src/server.h
index 50899f8fe..ff5d6432f 100644
--- a/src/server.h
+++ b/src/server.h
@@ -612,16 +612,17 @@ struct sharedObjectsStruct {
*masterdownerr, *roslaveerr, *execaborterr, *noautherr, *noreplicaserr,
*busykeyerr, *oomerr, *plus, *messagebulk, *pmessagebulk, *subscribebulk,
*unsubscribebulk, *psubscribebulk, *punsubscribebulk, *del, *rpop, *lpop,
- *lpush, *emptyscan, *minstring, *maxstring,
+ *lpush, *emptyscan,
*select[PROTO_SHARED_SELECT_CMDS],
*integers[OBJ_SHARED_INTEGERS],
*mbulkhdr[OBJ_SHARED_BULKHDR_LEN], /* "*<value>\r\n" */
*bulkhdr[OBJ_SHARED_BULKHDR_LEN]; /* "$<value>\r\n" */
+ sds minstring, maxstring;
};
/* ZSETs use a specialized version of Skiplists */
typedef struct zskiplistNode {
- robj *obj;
+ sds ele;
double score;
struct zskiplistNode *backward;
struct zskiplistLevel {
@@ -1261,15 +1262,15 @@ typedef struct {
/* Struct to hold an inclusive/exclusive range spec by lexicographic comparison. */
typedef struct {
- robj *min, *max; /* May be set to shared.(minstring|maxstring) */
+ sds min, max; /* May be set to shared.(minstring|maxstring) */
int minex, maxex; /* are min or max exclusive? */
} zlexrangespec;
zskiplist *zslCreate(void);
void zslFree(zskiplist *zsl);
-zskiplistNode *zslInsert(zskiplist *zsl, double score, robj *obj);
-unsigned char *zzlInsert(unsigned char *zl, robj *ele, double score);
-int zslDelete(zskiplist *zsl, double score, robj *obj);
+zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele);
+unsigned char *zzlInsert(unsigned char *zl, sds ele, double score);
+int zslDelete(zskiplist *zsl, double score, sds ele, zskiplistNode **node);
zskiplistNode *zslFirstInRange(zskiplist *zsl, zrangespec *range);
zskiplistNode *zslLastInRange(zskiplist *zsl, zrangespec *range);
double zzlGetScore(unsigned char *sptr);
@@ -1277,8 +1278,8 @@ void zzlNext(unsigned char *zl, unsigned char **eptr, unsigned char **sptr);
void zzlPrev(unsigned char *zl, unsigned char **eptr, unsigned char **sptr);
unsigned int zsetLength(robj *zobj);
void zsetConvert(robj *zobj, int encoding);
-int zsetScore(robj *zobj, robj *member, double *score);
-unsigned long zslGetRank(zskiplist *zsl, double score, robj *o);
+int zsetScore(robj *zobj, sds member, double *score);
+unsigned long zslGetRank(zskiplist *zsl, double score, sds o);
/* Core functions */
int freeMemoryIfNeeded(void);
diff --git a/src/sort.c b/src/sort.c
index af185dbd4..536302bda 100644
--- a/src/sort.c
+++ b/src/sort.c
@@ -399,7 +399,7 @@ void sortCommand(client *c) {
zset *zs = sortval->ptr;
zskiplist *zsl = zs->zsl;
zskiplistNode *ln;
- robj *ele;
+ sds sdsele;
int rangelen = vectorlen;
/* Check if starting point is trivial, before doing log(N) lookup. */
@@ -417,8 +417,8 @@ void sortCommand(client *c) {
while(rangelen--) {
serverAssertWithInfo(c,sortval,ln != NULL);
- ele = ln->obj;
- vector[j].obj = ele;
+ sdsele = ln->ele;
+ vector[j].obj = createStringObject(sdsele,sdslen(sdsele));
vector[j].u.score = 0;
vector[j].u.cmpobj = NULL;
j++;
@@ -431,9 +431,11 @@ void sortCommand(client *c) {
dict *set = ((zset*)sortval->ptr)->dict;
dictIterator *di;
dictEntry *setele;
+ sds sdsele;
di = dictGetIterator(set);
while((setele = dictNext(di)) != NULL) {
- vector[j].obj = dictGetKey(setele);
+ sdsele = dictGetKey(setele);
+ vector[j].obj = createStringObject(sdsele,sdslen(sdsele));
vector[j].u.score = 0;
vector[j].u.cmpobj = NULL;
j++;
diff --git a/src/t_zset.c b/src/t_zset.c
index 772b06ef4..84bea5aa8 100644
--- a/src/t_zset.c
+++ b/src/t_zset.c
@@ -38,9 +38,16 @@
*
* The elements are added to a hash table mapping Redis objects to scores.
* At the same time the elements are added to a skip list mapping scores
- * to Redis objects (so objects are sorted by scores in this "view"). */
-
-/* This skiplist implementation is almost a C translation of the original
+ * to Redis objects (so objects are sorted by scores in this "view").
+ *
+ * Note that the SDS string representing the element is the same in both
+ * the hash table and skiplist in order to save memory. What we do in order
+ * to manage the shared SDS string more easily is to free the SDS string
+ * only in zslFreeNode(). The dictionary has no value free method set.
+ * So we should always remove an element from the dictionary, and later from
+ * the skiplist.
+ *
+ * This skiplist implementation is almost a C translation of the original
* algorithm described by William Pugh in "Skip Lists: A Probabilistic
* Alternative to Balanced Trees", modified in three ways:
* a) this implementation allows for repeated scores.
@@ -52,16 +59,24 @@
#include "server.h"
#include <math.h>
-static int zslLexValueGteMin(robj *value, zlexrangespec *spec);
-static int zslLexValueLteMax(robj *value, zlexrangespec *spec);
+/*-----------------------------------------------------------------------------
+ * Skiplist implementation of the low level API
+ *----------------------------------------------------------------------------*/
+
+static int zslLexValueGteMin(sds value, zlexrangespec *spec);
+static int zslLexValueLteMax(sds value, zlexrangespec *spec);
-zskiplistNode *zslCreateNode(int level, double score, robj *obj) {
- zskiplistNode *zn = zmalloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel));
+/* Create a skiplist node with the specified number of levels.
+ * The SDS string 'ele' is referenced by the node after the call. */
+zskiplistNode *zslCreateNode(int level, double score, sds ele) {
+ zskiplistNode *zn =
+ zmalloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel));
zn->score = score;
- zn->obj = obj;
+ zn->ele = ele;
return zn;
}
+/* Create a new skiplist. */
zskiplist *zslCreate(void) {
int j;
zskiplist *zsl;
@@ -79,11 +94,15 @@ zskiplist *zslCreate(void) {
return zsl;
}
+/* Free the specified skiplist node. The referenced SDS string representation
+ * of the element is freed too, unless node->ele is set to NULL before calling
+ * this function. */
void zslFreeNode(zskiplistNode *node) {
- decrRefCount(node->obj);
+ sdsfree(node->ele);
zfree(node);
}
+/* Free a whole skiplist. */
void zslFree(zskiplist *zsl) {
zskiplistNode *node = zsl->header->level[0].forward, *next;
@@ -107,7 +126,10 @@ int zslRandomLevel(void) {
return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
}
-zskiplistNode *zslInsert(zskiplist *zsl, double score, robj *obj) {
+/* Insert a new node in the skiplist. Assumes the element does not already
+ * exist (up to the caller to enforce that). The skiplist takes ownership
+ * of the passed SDS string 'ele'. */
+zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele) {
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
unsigned int rank[ZSKIPLIST_MAXLEVEL];
int i, level;
@@ -118,18 +140,19 @@ zskiplistNode *zslInsert(zskiplist *zsl, double score, robj *obj) {
/* store rank that is crossed to reach the insert position */
rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
while (x->level[i].forward &&
- (x->level[i].forward->score < score ||
- (x->level[i].forward->score == score &&
- compareStringObjects(x->level[i].forward->obj,obj) < 0))) {
+ (x->level[i].forward->score < score ||
+ (x->level[i].forward->score == score &&
+ sdscmp(x->level[i].forward->ele,ele) < 0)))
+ {
rank[i] += x->level[i].span;
x = x->level[i].forward;
}
update[i] = x;
}
- /* we assume the key is not already inside, since we allow duplicated
- * scores, and the re-insertion of score and redis object should never
- * happen since the caller of zslInsert() should test in the hash table
- * if the element is already inside or not. */
+ /* we assume the element is not already inside, since we allow duplicated
+ * scores, reinserting the same element should never happen since the
+ * caller of zslInsert() should test in the hash table if the element is
+ * already inside or not. */
level = zslRandomLevel();
if (level > zsl->level) {
for (i = zsl->level; i < level; i++) {
@@ -139,7 +162,7 @@ zskiplistNode *zslInsert(zskiplist *zsl, double score, robj *obj) {
}
zsl->level = level;
}
- x = zslCreateNode(level,score,obj);
+ x = zslCreateNode(level,score,ele);
for (i = 0; i < level; i++) {
x->level[i].forward = update[i]->level[i].forward;
update[i]->level[i].forward = x;
@@ -184,26 +207,38 @@ void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) {
zsl->length--;
}
-/* Delete an element with matching score/object from the skiplist. */
-int zslDelete(zskiplist *zsl, double score, robj *obj) {
+/* Delete an element with matching score/element from the skiplist.
+ * The function returns 1 if the node was found and deleted, otherwise
+ * 0 is returned.
+ *
+ * If 'node' is NULL the deleted node is freed by zslFreeNode(), otherwise
+ * it is not freed (but just unlinked) and *node is set to the node pointer,
+ * so that it is possible for the caller to reuse the node (including the
+ * referenced SDS string at node->ele). */
+int zslDelete(zskiplist *zsl, double score, sds ele, zskiplistNode **node) {
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
int i;
x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
while (x->level[i].forward &&
- (x->level[i].forward->score < score ||
- (x->level[i].forward->score == score &&
- compareStringObjects(x->level[i].forward->obj,obj) < 0)))
+ (x->level[i].forward->score < score ||
+ (x->level[i].forward->score == score &&
+ sdscmp(x->level[i].forward->ele,ele) < 0)))
+ {
x = x->level[i].forward;
+ }
update[i] = x;
}
/* We may have multiple elements with the same score, what we need
* is to find the element with both the right score and object. */
x = x->level[0].forward;
- if (x && score == x->score && equalStringObjects(x->obj,obj)) {
+ if (x && score == x->score && sdscmp(x->ele,ele) == 0) {
zslDeleteNode(zsl, x, update);
- zslFreeNode(x);
+ if (!node)
+ zslFreeNode(x);
+ else
+ *node = x;
return 1;
}
return 0; /* not found */
@@ -312,8 +347,8 @@ unsigned long zslDeleteRangeByScore(zskiplist *zsl, zrangespec *range, dict *dic
{
zskiplistNode *next = x->level[0].forward;
zslDeleteNode(zsl,x,update);
- dictDelete(dict,x->obj);
- zslFreeNode(x);
+ dictDelete(dict,x->ele);
+ zslFreeNode(x); /* Here is where x->ele is actually released. */
removed++;
x = next;
}
@@ -329,7 +364,7 @@ unsigned long zslDeleteRangeByLex(zskiplist *zsl, zlexrangespec *range, dict *di
x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
while (x->level[i].forward &&
- !zslLexValueGteMin(x->level[i].forward->obj,range))
+ !zslLexValueGteMin(x->level[i].forward->ele,range))
x = x->level[i].forward;
update[i] = x;
}
@@ -338,11 +373,11 @@ unsigned long zslDeleteRangeByLex(zskiplist *zsl, zlexrangespec *range, dict *di
x = x->level[0].forward;
/* Delete nodes while in range. */
- while (x && zslLexValueLteMax(x->obj,range)) {
+ while (x && zslLexValueLteMax(x->ele,range)) {
zskiplistNode *next = x->level[0].forward;
zslDeleteNode(zsl,x,update);
- dictDelete(dict,x->obj);
- zslFreeNode(x);
+ dictDelete(dict,x->ele);
+ zslFreeNode(x); /* Here is where x->ele is actually released. */
removed++;
x = next;
}
@@ -370,7 +405,7 @@ unsigned long zslDeleteRangeByRank(zskiplist *zsl, unsigned int start, unsigned
while (x && traversed <= end) {
zskiplistNode *next = x->level[0].forward;
zslDeleteNode(zsl,x,update);
- dictDelete(dict,x->obj);
+ dictDelete(dict,x->ele);
zslFreeNode(x);
removed++;
traversed++;
@@ -383,7 +418,7 @@ unsigned long zslDeleteRangeByRank(zskiplist *zsl, unsigned int start, unsigned
* Returns 0 when the element cannot be found, rank otherwise.
* Note that the rank is 1-based due to the span of zsl->header to the
* first element. */
-unsigned long zslGetRank(zskiplist *zsl, double score, robj *o) {
+unsigned long zslGetRank(zskiplist *zsl, double score, sds ele) {
zskiplistNode *x;
unsigned long rank = 0;
int i;
@@ -393,13 +428,13 @@ unsigned long zslGetRank(zskiplist *zsl, double score, robj *o) {
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
- compareStringObjects(x->level[i].forward->obj,o) <= 0))) {
+ sdscmp(x->level[i].forward->ele,ele) <= 0))) {
rank += x->level[i].span;
x = x->level[i].forward;
}
/* x might be equal to zsl->header, so test if obj is non-NULL */
- if (x->obj && equalStringObjects(x->obj,o)) {
+ if (x->ele && sdscmp(x->ele,ele) == 0) {
return rank;
}
}
@@ -478,7 +513,7 @@ static int zslParseRange(robj *min, robj *max, zrangespec *spec) {
*
* If the string is not a valid range C_ERR is returned, and the value
* of *dest and *ex is undefined. */
-int zslParseLexRangeItem(robj *item, robj **dest, int *ex) {
+int zslParseLexRangeItem(robj *item, sds *dest, int *ex) {
char *c = item->ptr;
switch(c[0]) {
@@ -486,27 +521,34 @@ int zslParseLexRangeItem(robj *item, robj **dest, int *ex) {
if (c[1] != '\0') return C_ERR;
*ex = 0;
*dest = shared.maxstring;
- incrRefCount(shared.maxstring);
return C_OK;
case '-':
if (c[1] != '\0') return C_ERR;
*ex = 0;
*dest = shared.minstring;
- incrRefCount(shared.minstring);
return C_OK;
case '(':
*ex = 1;
- *dest = createStringObject(c+1,sdslen(c)-1);
+ *dest = sdsnewlen(c+1,sdslen(c)-1);
return C_OK;
case '[':
*ex = 0;
- *dest = createStringObject(c+1,sdslen(c)-1);
+ *dest = sdsnewlen(c+1,sdslen(c)-1);
return C_OK;
default:
return C_ERR;
}
}
+/* Free a lex range structure, must be called only after zelParseLexRange()
+ * populated the structure with success (C_OK returned). */
+void zslFreeLexRange(zlexrangespec *spec) {
+ if (spec->min != shared.minstring &&
+ spec->min != shared.maxstring) sdsfree(spec->min);
+ if (spec->max != shared.minstring &&
+ spec->max != shared.maxstring) sdsfree(spec->max);
+}
+
/* Populate the rangespec according to the objects min and max.
*
* Return C_OK on success. On error C_ERR is returned.
@@ -521,42 +563,33 @@ static int zslParseLexRange(robj *min, robj *max, zlexrangespec *spec) {
spec->min = spec->max = NULL;
if (zslParseLexRangeItem(min, &spec->min, &spec->minex) == C_ERR ||
zslParseLexRangeItem(max, &spec->max, &spec->maxex) == C_ERR) {
- if (spec->min) decrRefCount(spec->min);
- if (spec->max) decrRefCount(spec->max);
+ zslFreeLexRange(spec);
return C_ERR;
} else {
return C_OK;
}
}
-/* Free a lex range structure, must be called only after zelParseLexRange()
- * populated the structure with success (C_OK returned). */
-void zslFreeLexRange(zlexrangespec *spec) {
- decrRefCount(spec->min);
- decrRefCount(spec->max);
-}
-
-/* This is just a wrapper to compareStringObjects() that is able to
+/* This is just a wrapper to sdscmp() that is able to
* handle shared.minstring and shared.maxstring as the equivalent of
* -inf and +inf for strings */
-int compareStringObjectsForLexRange(robj *a, robj *b) {
- if (a == b) return 0; /* This makes sure that we handle inf,inf and
- -inf,-inf ASAP. One special case less. */
+int sdscmplex(sds a, sds b) {
+ if (a == b) return 0;
if (a == shared.minstring || b == shared.maxstring) return -1;
if (a == shared.maxstring || b == shared.minstring) return 1;
- return compareStringObjects(a,b);
+ return sdscmp(a,b);
}
-static int zslLexValueGteMin(robj *value, zlexrangespec *spec) {
+static int zslLexValueGteMin(sds value, zlexrangespec *spec) {
return spec->minex ?
- (compareStringObjectsForLexRange(value,spec->min) > 0) :
- (compareStringObjectsForLexRange(value,spec->min) >= 0);
+ (sdscmplex(value,spec->min) > 0) :
+ (sdscmplex(value,spec->min) >= 0);
}
-static int zslLexValueLteMax(robj *value, zlexrangespec *spec) {
+static int zslLexValueLteMax(sds value, zlexrangespec *spec) {
return spec->maxex ?
- (compareStringObjectsForLexRange(value,spec->max) < 0) :
- (compareStringObjectsForLexRange(value,spec->max) <= 0);
+ (sdscmplex(value,spec->max) < 0) :
+ (sdscmplex(value,spec->max) <= 0);
}
/* Returns if there is a part of the zset is in the lex range. */
@@ -564,15 +597,15 @@ int zslIsInLexRange(zskiplist *zsl, zlexrangespec *range) {
zskiplistNode *x;
/* Test for ranges that will always be empty. */
- if (compareStringObjectsForLexRange(range->min,range->max) > 1 ||
- (compareStringObjects(range->min,range->max) == 0 &&
+ if (sdscmplex(range->min,range->max) > 1 ||
+ (sdscmp(range->min,range->max) == 0 &&
(range->minex || range->maxex)))
return 0;
x = zsl->tail;
- if (x == NULL || !zslLexValueGteMin(x->obj,range))
+ if (x == NULL || !zslLexValueGteMin(x->ele,range))
return 0;
x = zsl->header->level[0].forward;
- if (x == NULL || !zslLexValueLteMax(x->obj,range))
+ if (x == NULL || !zslLexValueLteMax(x->ele,range))
return 0;
return 1;
}
@@ -590,7 +623,7 @@ zskiplistNode *zslFirstInLexRange(zskiplist *zsl, zlexrangespec *range) {
for (i = zsl->level-1; i >= 0; i--) {
/* Go forward while *OUT* of range. */
while (x->level[i].forward &&
- !zslLexValueGteMin(x->level[i].forward->obj,range))
+ !zslLexValueGteMin(x->level[i].forward->ele,range))
x = x->level[i].forward;
}
@@ -599,7 +632,7 @@ zskiplistNode *zslFirstInLexRange(zskiplist *zsl, zlexrangespec *range) {
serverAssert(x != NULL);
/* Check if score <= max. */
- if (!zslLexValueLteMax(x->obj,range)) return NULL;
+ if (!zslLexValueLteMax(x->ele,range)) return NULL;
return x;
}
@@ -616,7 +649,7 @@ zskiplistNode *zslLastInLexRange(zskiplist *zsl, zlexrangespec *range) {
for (i = zsl->level-1; i >= 0; i--) {
/* Go forward while *IN* range. */
while (x->level[i].forward &&
- zslLexValueLteMax(x->level[i].forward->obj,range))
+ zslLexValueLteMax(x->level[i].forward->ele,range))
x = x->level[i].forward;
}
@@ -624,7 +657,7 @@ zskiplistNode *zslLastInLexRange(zskiplist *zsl, zlexrangespec *range) {
serverAssert(x != NULL);
/* Check if score >= min. */
- if (!zslLexValueGteMin(x->obj,range)) return NULL;
+ if (!zslLexValueGteMin(x->ele,range)) return NULL;
return x;
}
@@ -653,10 +686,8 @@ double zzlGetScore(unsigned char *sptr) {
return score;
}
-/* Return a ziplist element as a Redis string object.
- * This simple abstraction can be used to simplifies some code at the
- * cost of some performance. */
-robj *ziplistGetObject(unsigned char *sptr) {
+/* Return a ziplist element as an SDS string. */
+sds ziplistGetObject(unsigned char *sptr) {
unsigned char *vstr;
unsigned int vlen;
long long vlong;
@@ -665,9 +696,9 @@ robj *ziplistGetObject(unsigned char *sptr) {
serverAssert(ziplistGet(sptr,&vstr,&vlen,&vlong));
if (vstr) {
- return createStringObject((char*)vstr,vlen);
+ return sdsnewlen((char*)vstr,vlen);
} else {
- return createStringObjectFromLongLong(vlong);
+ return sdsfromlonglong(vlong);
}
}
@@ -822,16 +853,16 @@ unsigned char *zzlLastInRange(unsigned char *zl, zrangespec *range) {
}
static int zzlLexValueGteMin(unsigned char *p, zlexrangespec *spec) {
- robj *value = ziplistGetObject(p);
+ sds value = ziplistGetObject(p);
int res = zslLexValueGteMin(value,spec);
- decrRefCount(value);
+ sdsfree(value);
return res;
}
static int zzlLexValueLteMax(unsigned char *p, zlexrangespec *spec) {
- robj *value = ziplistGetObject(p);
+ sds value = ziplistGetObject(p);
int res = zslLexValueLteMax(value,spec);
- decrRefCount(value);
+ sdsfree(value);
return res;
}
@@ -841,8 +872,8 @@ int zzlIsInLexRange(unsigned char *zl, zlexrangespec *range) {
unsigned char *p;
/* Test for ranges that will always be empty. */
- if (compareStringObjectsForLexRange(range->min,range->max) > 1 ||
- (compareStringObjects(range->min,range->max) == 0 &&
+ if (sdscmplex(range->min,range->max) > 1 ||
+ (sdscmp(range->min,range->max) == 0 &&
(range->minex || range->maxex)))
return 0;
@@ -912,26 +943,22 @@ unsigned char *zzlLastInLexRange(unsigned char *zl, zlexrangespec *range) {
return NULL;
}
-unsigned char *zzlFind(unsigned char *zl, robj *ele, double *score) {
+unsigned char *zzlFind(unsigned char *zl, sds ele, double *score) {
unsigned char *eptr = ziplistIndex(zl,0), *sptr;
- ele = getDecodedObject(ele);
while (eptr != NULL) {
sptr = ziplistNext(zl,eptr);
- serverAssertWithInfo(NULL,ele,sptr != NULL);
+ serverAssert(sptr != NULL);
- if (ziplistCompare(eptr,ele->ptr,sdslen(ele->ptr))) {
+ if (ziplistCompare(eptr,(unsigned char*)ele,sdslen(ele))) {
/* Matching element, pull out score. */
if (score != NULL) *score = zzlGetScore(sptr);
- decrRefCount(ele);
return eptr;
}
/* Move to next element. */
eptr = ziplistNext(zl,sptr);
}
-
- decrRefCount(ele);
return NULL;
}
@@ -946,41 +973,38 @@ unsigned char *zzlDelete(unsigned char *zl, unsigned char *eptr) {
return zl;
}
-unsigned char *zzlInsertAt(unsigned char *zl, unsigned char *eptr, robj *ele, double score) {
+unsigned char *zzlInsertAt(unsigned char *zl, unsigned char *eptr, sds ele, double score) {
unsigned char *sptr;
char scorebuf[128];
int scorelen;
size_t offset;
- serverAssertWithInfo(NULL,ele,sdsEncodedObject(ele));
scorelen = d2string(scorebuf,sizeof(scorebuf),score);
if (eptr == NULL) {
- zl = ziplistPush(zl,ele->ptr,sdslen(ele->ptr),ZIPLIST_TAIL);
+ zl = ziplistPush(zl,(unsigned char*)ele,sdslen(ele),ZIPLIST_TAIL);
zl = ziplistPush(zl,(unsigned char*)scorebuf,scorelen,ZIPLIST_TAIL);
} else {
/* Keep offset relative to zl, as it might be re-allocated. */
offset = eptr-zl;
- zl = ziplistInsert(zl,eptr,ele->ptr,sdslen(ele->ptr));
+ zl = ziplistInsert(zl,eptr,(unsigned char*)ele,sdslen(ele));
eptr = zl+offset;
/* Insert score after the element. */
- serverAssertWithInfo(NULL,ele,(sptr = ziplistNext(zl,eptr)) != NULL);
+ serverAssert((sptr = ziplistNext(zl,eptr)) != NULL);
zl = ziplistInsert(zl,sptr,(unsigned char*)scorebuf,scorelen);
}
-
return zl;
}
/* Insert (element,score) pair in ziplist. This function assumes the element is
* not yet present in the list. */
-unsigned char *zzlInsert(unsigned char *zl, robj *ele, double score) {
+unsigned char *zzlInsert(unsigned char *zl, sds ele, double score) {
unsigned char *eptr = ziplistIndex(zl,0), *sptr;
double s;
- ele = getDecodedObject(ele);
while (eptr != NULL) {
sptr = ziplistNext(zl,eptr);
- serverAssertWithInfo(NULL,ele,sptr != NULL);
+ serverAssert(sptr != NULL);
s = zzlGetScore(sptr);
if (s > score) {
@@ -991,7 +1015,7 @@ unsigned char *zzlInsert(unsigned char *zl, robj *ele, double score) {
break;
} else if (s == score) {
/* Ensure lexicographical ordering for elements. */
- if (zzlCompareElements(eptr,ele->ptr,sdslen(ele->ptr)) > 0) {
+ if (zzlCompareElements(eptr,(unsigned char*)ele,sdslen(ele)) > 0) {
zl = zzlInsertAt(zl,eptr,ele,score);
break;
}
@@ -1004,8 +1028,6 @@ unsigned char *zzlInsert(unsigned char *zl, robj *ele, double score) {
/* Push on tail of list when it was not yet inserted. */
if (eptr == NULL)
zl = zzlInsertAt(zl,NULL,ele,score);
-
- decrRefCount(ele);
return zl;
}
@@ -1093,7 +1115,7 @@ unsigned int zsetLength(robj *zobj) {
void zsetConvert(robj *zobj, int encoding) {
zset *zs;
zskiplistNode *node, *next;
- robj *ele;
+ sds ele;
double score;
if (zobj->encoding == encoding) return;
@@ -1120,14 +1142,12 @@ void zsetConvert(robj *zobj, int encoding) {
score = zzlGetScore(sptr);
serverAssertWithInfo(NULL,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong));
if (vstr == NULL)
- ele = createStringObjectFromLongLong(vlong);
+ ele = sdsfromlonglong(vlong);
else
- ele = createStringObject((char*)vstr,vlen);
+ ele = sdsnewlen((char*)vstr,vlen);
- /* Has incremented refcount since it was just created. */
node = zslInsert(zs->zsl,score,ele);
- serverAssertWithInfo(NULL,zobj,dictAdd(zs->dict,ele,&node->score) == DICT_OK);
- incrRefCount(ele); /* Added to dictionary. */
+ serverAssert(dictAdd(zs->dict,ele,&node->score) == DICT_OK);
zzlNext(zl,&eptr,&sptr);
}
@@ -1149,10 +1169,7 @@ void zsetConvert(robj *zobj, int encoding) {
zfree(zs->zsl);
while (node) {
- ele = getDecodedObject(node->obj);
- zl = zzlInsertAt(zl,NULL,ele,node->score);
- decrRefCount(ele);
-
+ zl = zzlInsertAt(zl,NULL,node->ele,node->score);
next = node->level[0].forward;
zslFreeNode(node);
node = next;
@@ -1170,7 +1187,7 @@ void zsetConvert(robj *zobj, int encoding) {
* storing it into *score. If the element does not exist C_ERR is returned
* otherwise C_OK is returned and *score is correctly populated.
* If 'zobj' or 'member' is NULL, C_ERR is returned. */
-int zsetScore(robj *zobj, robj *member, double *score) {
+int zsetScore(robj *zobj, sds member, double *score) {
if (!zobj || !member) return C_ERR;
if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
@@ -1199,9 +1216,8 @@ int zsetScore(robj *zobj, robj *member, double *score) {
void zaddGenericCommand(client *c, int flags) {
static char *nanerr = "resulting score is not a number (NaN)";
robj *key = c->argv[1];
- robj *ele;
robj *zobj;
- robj *curobj;
+ sds ele;
double score = 0, *scores = NULL, curscore = 0.0;
int j, elements;
int scoreidx = 0;
@@ -1285,11 +1301,10 @@ void zaddGenericCommand(client *c, int flags) {
for (j = 0; j < elements; j++) {
score = scores[j];
+ ele = c->argv[scoreidx+1+j*2]->ptr;
if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
unsigned char *eptr;
- /* Prefer non-encoded element when dealing with ziplists. */
- ele = c->argv[scoreidx+1+j*2];
if ((eptr = zzlFind(zobj->ptr,ele,&curscore)) != NULL) {
if (nx) continue;
if (incr) {
@@ -1314,7 +1329,7 @@ void zaddGenericCommand(client *c, int flags) {
zobj->ptr = zzlInsert(zobj->ptr,ele,score);
if (zzlLength(zobj->ptr) > server.zset_max_ziplist_entries)
zsetConvert(zobj,OBJ_ENCODING_SKIPLIST);
- if (sdslen(ele->ptr) > server.zset_max_ziplist_value)
+ if (sdslen(ele) > server.zset_max_ziplist_value)
zsetConvert(zobj,OBJ_ENCODING_SKIPLIST);
server.dirty++;
added++;
@@ -1325,12 +1340,9 @@ void zaddGenericCommand(client *c, int flags) {
zskiplistNode *znode;
dictEntry *de;
- ele = c->argv[scoreidx+1+j*2] =
- tryObjectEncoding(c->argv[scoreidx+1+j*2]);
de = dictFind(zs->dict,ele);
if (de != NULL) {
if (nx) continue;
- curobj = dictGetKey(de);
curscore = *(double*)dictGetVal(de);
if (incr) {
@@ -1343,23 +1355,27 @@ void zaddGenericCommand(client *c, int flags) {
}
}
- /* Remove and re-insert when score changed. We can safely
- * delete the key object from the skiplist, since the
- * dictionary still has a reference to it. */
+ /* Remove and re-insert when score changes. */
if (score != curscore) {
- serverAssertWithInfo(c,curobj,zslDelete(zs->zsl,curscore,curobj));
- znode = zslInsert(zs->zsl,score,curobj);
- incrRefCount(curobj); /* Re-inserted in skiplist. */
+ zskiplistNode *node;
+ serverAssert(zslDelete(zs->zsl,curscore,ele,&node));
+ znode = zslInsert(zs->zsl,score,node->ele);
+ /* We reused the node->ele SDS string, free the node now
+ * since zslInsert created a new one. */
+ node->ele = NULL;
+ zslFreeNode(node);
+ /* Note that we did not removed the original element from
+ * the hash table representing the sorted set, so we just
+ * update the score. */
dictGetVal(de) = &znode->score; /* Update score ptr. */
server.dirty++;
updated++;
}
processed++;
} else if (!xx) {
+ ele = sdsdup(ele);
znode = zslInsert(zs->zsl,score,ele);
- incrRefCount(ele); /* Inserted in skiplist. */
- serverAssertWithInfo(c,NULL,dictAdd(zs->dict,ele,&znode->score) == DICT_OK);
- incrRefCount(ele); /* Added to dictionary. */
+ serverAssert(dictAdd(zs->dict,ele,&znode->score) == DICT_OK);
server.dirty++;
added++;
processed++;
@@ -1408,7 +1424,7 @@ void zremCommand(client *c) {
unsigned char *eptr;
for (j = 2; j < c->argc; j++) {
- if ((eptr = zzlFind(zobj->ptr,c->argv[j],NULL)) != NULL) {
+ if ((eptr = zzlFind(zobj->ptr,c->argv[j]->ptr,NULL)) != NULL) {
deleted++;
zobj->ptr = zzlDelete(zobj->ptr,eptr);
if (zzlLength(zobj->ptr) == 0) {
@@ -1424,16 +1440,26 @@ void zremCommand(client *c) {
double score;
for (j = 2; j < c->argc; j++) {
- de = dictFind(zs->dict,c->argv[j]);
+ de = dictFind(zs->dict,c->argv[j]->ptr);
if (de != NULL) {
deleted++;
- /* Delete from the skiplist */
+ /* Get the score in order to delete from the skiplist later. */
score = *(double*)dictGetVal(de);
- serverAssertWithInfo(c,c->argv[j],zslDelete(zs->zsl,score,c->argv[j]));
- /* Delete from the hash table */
- dictDelete(zs->dict,c->argv[j]);
+ /* Delete from the hash table and later from the skiplist.
+ * Note that the order is important: deleting from the skiplist
+ * actually releases the SDS string representing the element,
+ * which is shared between the skiplist and the hash table, so
+ * we need to delete from the skiplist as the final step. */
+ int retval1 = dictDelete(zs->dict,c->argv[j]->ptr);
+
+ /* Delete from skiplist. */
+ int retval2 = zslDelete(zs->zsl,score,c->argv[j]->ptr,NULL);
+
+ serverAssertWithInfo(c,c->argv[j],
+ retval1 == DICT_OK && retval2);
+
if (htNeedsResize(zs->dict)) dictResize(zs->dict);
if (dictSize(zs->dict) == 0) {
dbDelete(c->db,key);
@@ -1613,7 +1639,7 @@ typedef struct {
* we already checked that "ell" holds a long long, or tried to convert another
* representation into a long long value. When this was successful,
* OPVAL_VALID_LL is set as well. */
-#define OPVAL_DIRTY_ROBJ 1
+#define OPVAL_DIRTY_SDS 1
#define OPVAL_DIRTY_LL 2
#define OPVAL_VALID_LL 4
@@ -1621,7 +1647,7 @@ typedef struct {
typedef struct {
int flags;
unsigned char _buf[32]; /* Private buffer. */
- robj *ele;
+ sds ele;
unsigned char *estr;
unsigned int elen;
long long ell;
@@ -1728,8 +1754,8 @@ int zuiNext(zsetopsrc *op, zsetopval *val) {
if (op->subject == NULL)
return 0;
- if (val->flags & OPVAL_DIRTY_ROBJ)
- decrRefCount(val->ele);
+ if (val->flags & OPVAL_DIRTY_SDS)
+ sdsfree(val->ele);
memset(val,0,sizeof(zsetopval));
@@ -1770,7 +1796,7 @@ int zuiNext(zsetopsrc *op, zsetopval *val) {
} else if (op->encoding == OBJ_ENCODING_SKIPLIST) {
if (it->sl.node == NULL)
return 0;
- val->ele = it->sl.node->obj;
+ val->ele = it->sl.node->ele;
val->score = it->sl.node->score;
/* Move to next element. */
@@ -1789,15 +1815,8 @@ int zuiLongLongFromValue(zsetopval *val) {
val->flags |= OPVAL_DIRTY_LL;
if (val->ele != NULL) {
- if (val->ele->encoding == OBJ_ENCODING_INT) {
- val->ell = (long)val->ele->ptr;
+ if (string2ll(val->ele,sdslen(val->ele),&val->ell))
val->flags |= OPVAL_VALID_LL;
- } else if (sdsEncodedObject(val->ele)) {
- if (string2ll(val->ele->ptr,sdslen(val->ele->ptr),&val->ell))
- val->flags |= OPVAL_VALID_LL;
- } else {
- serverPanic("Unsupported element encoding");
- }
} else if (val->estr != NULL) {
if (string2ll((char*)val->estr,val->elen,&val->ell))
val->flags |= OPVAL_VALID_LL;
@@ -1809,30 +1828,41 @@ int zuiLongLongFromValue(zsetopval *val) {
return val->flags & OPVAL_VALID_LL;
}
-robj *zuiObjectFromValue(zsetopval *val) {
+sds zuiSdsFromValue(zsetopval *val) {
if (val->ele == NULL) {
if (val->estr != NULL) {
- val->ele = createStringObject((char*)val->estr,val->elen);
+ val->ele = sdsnewlen((char*)val->estr,val->elen);
} else {
- val->ele = createStringObjectFromLongLong(val->ell);
+ val->ele = sdsfromlonglong(val->ell);
}
- val->flags |= OPVAL_DIRTY_ROBJ;
+ val->flags |= OPVAL_DIRTY_SDS;
}
return val->ele;
}
+/* This is different from zuiSdsFromValue since returns a new SDS string
+ * which is up to the caller to free. */
+sds zuiNewSdsFromValue(zsetopval *val) {
+ if (val->flags & OPVAL_DIRTY_SDS) {
+ /* We have already one to return! */
+ sds ele = val->ele;
+ val->flags &= ~OPVAL_DIRTY_SDS;
+ val->ele = NULL;
+ return ele;
+ } else if (val->ele) {
+ return sdsdup(val->ele);
+ } else if (val->estr) {
+ return sdsnewlen((char*)val->estr,val->elen);
+ } else {
+ return sdsfromlonglong(val->ell);
+ }
+}
+
int zuiBufferFromValue(zsetopval *val) {
if (val->estr == NULL) {
if (val->ele != NULL) {
- if (val->ele->encoding == OBJ_ENCODING_INT) {
- val->elen = ll2string((char*)val->_buf,sizeof(val->_buf),(long)val->ele->ptr);
- val->estr = val->_buf;
- } else if (sdsEncodedObject(val->ele)) {
- val->elen = sdslen(val->ele->ptr);
- val->estr = val->ele->ptr;
- } else {
- serverPanic("Unsupported element encoding");
- }
+ val->elen = sdslen(val->ele);
+ val->estr = (unsigned char*)val->ele;
} else {
val->elen = ll2string((char*)val->_buf,sizeof(val->_buf),val->ell);
val->estr = val->_buf;
@@ -1859,7 +1889,7 @@ int zuiFind(zsetopsrc *op, zsetopval *val, double *score) {
}
} else if (op->encoding == OBJ_ENCODING_HT) {
dict *ht = op->subject->ptr;
- zuiObjectFromValue(val);
+ zuiSdsFromValue(val);
if (dictFind(ht,val->ele) != NULL) {
*score = 1.0;
return 1;
@@ -1870,7 +1900,7 @@ int zuiFind(zsetopsrc *op, zsetopval *val, double *score) {
serverPanic("Unknown set encoding");
}
} else if (op->type == OBJ_ZSET) {
- zuiObjectFromValue(val);
+ zuiSdsFromValue(val);
if (op->encoding == OBJ_ENCODING_ZIPLIST) {
if (zzlFind(op->subject->ptr,val->ele,score) != NULL) {
@@ -1922,13 +1952,25 @@ inline static void zunionInterAggregate(double *target, double val, int aggregat
}
}
+unsigned int dictSdsHash(const void *key);
+int dictSdsKeyCompare(void *privdata, const void *key1, const void *key2);
+
+dictType setAccumulatorDictType = {
+ dictSdsHash, /* hash function */
+ NULL, /* key dup */
+ NULL, /* val dup */
+ dictSdsKeyCompare, /* key compare */
+ NULL, /* key destructor */
+ NULL /* val destructor */
+};
+
void zunionInterGenericCommand(client *c, robj *dstkey, int op) {
int i, j;
long setnum;
int aggregate = REDIS_AGGR_SUM;
zsetopsrc *src;
zsetopval zval;
- robj *tmp;
+ sds tmp;
unsigned int maxelelen = 0;
robj *dstobj;
zset *dstzset;
@@ -1978,7 +2020,9 @@ void zunionInterGenericCommand(client *c, robj *dstkey, int op) {
int remaining = c->argc - j;
while (remaining) {
- if (remaining >= (setnum + 1) && !strcasecmp(c->argv[j]->ptr,"weights")) {
+ if (remaining >= (setnum + 1) &&
+ !strcasecmp(c->argv[j]->ptr,"weights"))
+ {
j++; remaining--;
for (i = 0; i < setnum; i++, j++, remaining--) {
if (getDoubleFromObjectOrReply(c,c->argv[j],&src[i].weight,
@@ -1988,7 +2032,9 @@ void zunionInterGenericCommand(client *c, robj *dstkey, int op) {
return;
}
}
- } else if (remaining >= 2 && !strcasecmp(c->argv[j]->ptr,"aggregate")) {
+ } else if (remaining >= 2 &&
+ !strcasecmp(c->argv[j]->ptr,"aggregate"))
+ {
j++; remaining--;
if (!strcasecmp(c->argv[j]->ptr,"sum")) {
aggregate = REDIS_AGGR_SUM;
@@ -2046,22 +2092,16 @@ void zunionInterGenericCommand(client *c, robj *dstkey, int op) {
/* Only continue when present in every input. */
if (j == setnum) {
- tmp = zuiObjectFromValue(&zval);
+ tmp = zuiNewSdsFromValue(&zval);
znode = zslInsert(dstzset->zsl,score,tmp);
- incrRefCount(tmp); /* added to skiplist */
dictAdd(dstzset->dict,tmp,&znode->score);
- incrRefCount(tmp); /* added to dictionary */
-
- if (sdsEncodedObject(tmp)) {
- if (sdslen(tmp->ptr) > maxelelen)
- maxelelen = sdslen(tmp->ptr);
- }
+ if (sdslen(tmp) > maxelelen) maxelelen = sdslen(tmp);
}
}
zuiClearIterator(&src[0]);
}
} else if (op == SET_OP_UNION) {
- dict *accumulator = dictCreate(&setDictType,NULL);
+ dict *accumulator = dictCreate(&setAccumulatorDictType,NULL);
dictIterator *di;
dictEntry *de;
double score;
@@ -2084,20 +2124,16 @@ void zunionInterGenericCommand(client *c, robj *dstkey, int op) {
if (isnan(score)) score = 0;
/* Search for this element in the accumulating dictionary. */
- de = dictFind(accumulator,zuiObjectFromValue(&zval));
+ de = dictFind(accumulator,zuiSdsFromValue(&zval));
/* If we don't have it, we need to create a new entry. */
if (de == NULL) {
- tmp = zuiObjectFromValue(&zval);
+ tmp = zuiNewSdsFromValue(&zval);
/* Remember the longest single element encountered,
* to understand if it's possible to convert to ziplist
* at the end. */
- if (sdsEncodedObject(tmp)) {
- if (sdslen(tmp->ptr) > maxelelen)
- maxelelen = sdslen(tmp->ptr);
- }
+ if (sdslen(tmp) > maxelelen) maxelelen = sdslen(tmp);
/* Add the element with its initial score. */
de = dictAddRaw(accumulator,tmp);
- incrRefCount(tmp);
dictSetDoubleVal(de,score);
} else {
/* Update the score with the score of the new instance
@@ -2121,16 +2157,12 @@ void zunionInterGenericCommand(client *c, robj *dstkey, int op) {
dictExpand(dstzset->dict,dictSize(accumulator));
while((de = dictNext(di)) != NULL) {
- robj *ele = dictGetKey(de);
+ sds ele = dictGetKey(de);
score = dictGetDoubleVal(de);
znode = zslInsert(dstzset->zsl,score,ele);
- incrRefCount(ele); /* added to skiplist */
dictAdd(dstzset->dict,ele,&znode->score);
- incrRefCount(ele); /* added to dictionary */
}
dictReleaseIterator(di);
-
- /* We can free the accumulator dictionary now. */
dictRelease(accumulator);
} else {
serverPanic("Unknown operator");
@@ -2247,7 +2279,7 @@ void zrangeGenericCommand(client *c, int reverse) {
zset *zs = zobj->ptr;
zskiplist *zsl = zs->zsl;
zskiplistNode *ln;
- robj *ele;
+ sds ele;
/* Check if starting point is trivial, before doing log(N) lookup. */
if (reverse) {
@@ -2262,8 +2294,8 @@ void zrangeGenericCommand(client *c, int reverse) {
while(rangelen--) {
serverAssertWithInfo(c,zobj,ln != NULL);
- ele = ln->obj;
- addReplyBulk(c,ele);
+ ele = ln->ele;
+ addReplyBulkCBuffer(c,ele,sdslen(ele));
if (withscores)
addReplyDouble(c,ln->score);
ln = reverse ? ln->backward : ln->level[0].forward;
@@ -2317,8 +2349,13 @@ void genericZrangebyscoreCommand(client *c, int reverse) {
pos++; remaining--;
withscores = 1;
} else if (remaining >= 3 && !strcasecmp(c->argv[pos]->ptr,"limit")) {
- if ((getLongFromObjectOrReply(c, c->argv[pos+1], &offset, NULL) != C_OK) ||
- (getLongFromObjectOrReply(c, c->argv[pos+2], &limit, NULL) != C_OK)) return;
+ if ((getLongFromObjectOrReply(c, c->argv[pos+1], &offset, NULL)
+ != C_OK) ||
+ (getLongFromObjectOrReply(c, c->argv[pos+2], &limit, NULL)
+ != C_OK))
+ {
+ return;
+ }
pos += 3; remaining -= 3;
} else {
addReply(c,shared.syntaxerr);
@@ -2444,7 +2481,7 @@ void genericZrangebyscoreCommand(client *c, int reverse) {
}
rangelen++;
- addReplyBulk(c,ln->obj);
+ addReplyBulkCBuffer(c,ln->ele,sdslen(ln->ele));
if (withscores) {
addReplyDouble(c,ln->score);
@@ -2534,7 +2571,7 @@ void zcountCommand(client *c) {
/* Use rank of first element, if any, to determine preliminary count */
if (zn != NULL) {
- rank = zslGetRank(zsl, zn->score, zn->obj);
+ rank = zslGetRank(zsl, zn->score, zn->ele);
count = (zsl->length - (rank - 1));
/* Find last element in range */
@@ -2542,7 +2579,7 @@ void zcountCommand(client *c) {
/* Use rank of last element, if any, to determine the actual count */
if (zn != NULL) {
- rank = zslGetRank(zsl, zn->score, zn->obj);
+ rank = zslGetRank(zsl, zn->score, zn->ele);
count -= (zsl->length - rank);
}
}
@@ -2612,7 +2649,7 @@ void zlexcountCommand(client *c) {
/* Use rank of first element, if any, to determine preliminary count */
if (zn != NULL) {
- rank = zslGetRank(zsl, zn->score, zn->obj);
+ rank = zslGetRank(zsl, zn->score, zn->ele);
count = (zsl->length - (rank - 1));
/* Find last element in range */
@@ -2620,7 +2657,7 @@ void zlexcountCommand(client *c) {
/* Use rank of last element, if any, to determine the actual count */
if (zn != NULL) {
- rank = zslGetRank(zsl, zn->score, zn->obj);
+ rank = zslGetRank(zsl, zn->score, zn->ele);
count -= (zsl->length - rank);
}
}
@@ -2786,13 +2823,13 @@ void genericZrangebylexCommand(client *c, int reverse) {
while (ln && limit--) {
/* Abort when the node is no longer in range. */
if (reverse) {
- if (!zslLexValueGteMin(ln->obj,&range)) break;
+ if (!zslLexValueGteMin(ln->ele,&range)) break;
} else {
- if (!zslLexValueLteMax(ln->obj,&range)) break;
+ if (!zslLexValueLteMax(ln->ele,&range)) break;
}
rangelen++;
- addReplyBulk(c,ln->obj);
+ addReplyBulkCBuffer(c,ln->ele,sdslen(ln->ele));
/* Move to next node */
if (reverse) {
@@ -2835,7 +2872,7 @@ void zscoreCommand(client *c) {
if ((zobj = lookupKeyReadOrReply(c,key,shared.nullbulk)) == NULL ||
checkType(c,zobj,OBJ_ZSET)) return;
- if (zsetScore(zobj,c->argv[2],&score) == C_ERR) {
+ if (zsetScore(zobj,c->argv[2]->ptr,&score) == C_ERR) {
addReply(c,shared.nullbulk);
} else {
addReplyDouble(c,score);
@@ -2887,11 +2924,12 @@ void zrankGenericCommand(client *c, int reverse) {
double score;
ele = c->argv[2];
- de = dictFind(zs->dict,ele);
+ de = dictFind(zs->dict,ele->ptr);
if (de != NULL) {
score = *(double*)dictGetVal(de);
- rank = zslGetRank(zsl,score,ele);
- serverAssertWithInfo(c,ele,rank); /* Existing elements always have a rank. */
+ rank = zslGetRank(zsl,score,ele->ptr);
+ /* Existing elements always have a rank. */
+ serverAssertWithInfo(c,ele,rank);
if (reverse)
addReplyLongLong(c,llen-rank);
else