summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorantirez <antirez@gmail.com>2012-02-04 08:11:31 +0100
committerantirez <antirez@gmail.com>2014-04-05 11:41:43 +0200
commit3a6a1e42f11bfee780eda6661865e59d2e747b59 (patch)
tree6d45589b57cf4089eeb658e81f5a87a8c2eca09d
parentd5be696db8e3256d5546252dc3534164a1970edf (diff)
downloadredis-3a6a1e42f11bfee780eda6661865e59d2e747b59.tar.gz
ZRANGEBYLEX and ZREVRANGEBYLEX implementation.
-rw-r--r--src/redis.c8
-rw-r--r--src/redis.h12
-rw-r--r--src/t_zset.c438
3 files changed, 456 insertions, 2 deletions
diff --git a/src/redis.c b/src/redis.c
index f9853c4f8..cc48f1de8 100644
--- a/src/redis.c
+++ b/src/redis.c
@@ -176,6 +176,8 @@ struct redisCommand redisCommandTable[] = {
{"zrange",zrangeCommand,-4,"r",0,NULL,1,1,1,0,0},
{"zrangebyscore",zrangebyscoreCommand,-4,"r",0,NULL,1,1,1,0,0},
{"zrevrangebyscore",zrevrangebyscoreCommand,-4,"r",0,NULL,1,1,1,0,0},
+ {"zrangebylex",zrangebylexCommand,-4,"r",0,NULL,1,1,1,0,0},
+ {"zrevrangebylex",zrevrangebylexCommand,-4,"r",0,NULL,1,1,1,0,0},
{"zcount",zcountCommand,4,"r",0,NULL,1,1,1,0,0},
{"zrevrange",zrevrangeCommand,-4,"r",0,NULL,1,1,1,0,0},
{"zcard",zcardCommand,2,"r",0,NULL,1,1,1,0,0},
@@ -1346,6 +1348,12 @@ void createSharedObjects(void) {
shared.bulkhdr[j] = createObject(REDIS_STRING,
sdscatprintf(sdsempty(),"$%d\r\n",j));
}
+ /* The following two shared objects, minstring and maxstrings, are not
+ * actually used for their value but as a special object meaning
+ * respectively the minimum possible string and the maximum possible
+ * string in string comparisons for the ZRANGEBYLEX command. */
+ shared.minstring = createStringObject("minstring",9);
+ shared.maxstring = createStringObject("maxstring",9);
}
void initServerConfig() {
diff --git a/src/redis.h b/src/redis.h
index e95b939e4..edd37bc18 100644
--- a/src/redis.h
+++ b/src/redis.h
@@ -546,7 +546,7 @@ struct sharedObjectsStruct {
*masterdownerr, *roslaveerr, *execaborterr, *noautherr, *noreplicaserr,
*oomerr, *plus, *messagebulk, *pmessagebulk, *subscribebulk,
*unsubscribebulk, *psubscribebulk, *punsubscribebulk, *del, *rpop, *lpop,
- *lpush, *emptyscan,
+ *lpush, *emptyscan, *minstring, *maxstring,
*select[REDIS_SHARED_SELECT_CMDS],
*integers[REDIS_SHARED_INTEGERS],
*mbulkhdr[REDIS_SHARED_BULKHDR_LEN], /* "*<value>\r\n" */
@@ -1130,12 +1130,18 @@ unsigned long aofRewriteBufferSize(void);
/* Sorted sets data type */
-/* Struct to hold a inclusive/exclusive range spec. */
+/* Struct to hold a inclusive/exclusive range spec by score comparison. */
typedef struct {
double min, max;
int minex, maxex; /* are min or max exclusive? */
} zrangespec;
+/* Struct to hold an inclusive/exclusive range spec by lexicographic comparison. */
+typedef struct {
+ robj *min, *max; /* May be set to shared.(minstring|maxstring) */
+ int minex, maxex; /* are min or max exclusive? */
+} zlexrangespec;
+
zskiplist *zslCreate(void);
void zslFree(zskiplist *zsl);
zskiplistNode *zslInsert(zskiplist *zsl, double score, robj *obj);
@@ -1387,6 +1393,8 @@ void zincrbyCommand(redisClient *c);
void zrangeCommand(redisClient *c);
void zrangebyscoreCommand(redisClient *c);
void zrevrangebyscoreCommand(redisClient *c);
+void zrangebylexCommand(redisClient *c);
+void zrevrangebylexCommand(redisClient *c);
void zcountCommand(redisClient *c);
void zrevrangeCommand(redisClient *c);
void zcardCommand(redisClient *c);
diff --git a/src/t_zset.c b/src/t_zset.c
index d5f6023a2..edeec3252 100644
--- a/src/t_zset.c
+++ b/src/t_zset.c
@@ -431,6 +431,158 @@ static int zslParseRange(robj *min, robj *max, zrangespec *spec) {
return REDIS_OK;
}
+/* ------------------------ Lexicographic ranges ---------------------------- */
+
+/* Parse max or min argument of ZRANGEBYLEX.
+ * (foo means foo (open interval)
+ * [foo means foo (closed interval)
+ * - means the min string possible
+ * + means the max string possible
+ *
+ * If the string is valid the *dest pointer is set to the redis object
+ * that will be used for the comparision, and ex will be set to 0 or 1
+ * respectively if the item is exclusive or inclusive. REDIS_OK will be
+ * returned.
+ *
+ * If the stirng is not a valid range REDIS_ERR is returned, and the value
+ * of *dest and *ex is undefined. */
+int zslParseLexRangeItem(robj *item, robj **dest, int *ex) {
+ char *c = item->ptr;
+
+ switch(c[0]) {
+ case '+':
+ if (c[1] != '\0') return REDIS_ERR;
+ *ex = 0;
+ *dest = shared.maxstring;
+ incrRefCount(shared.maxstring);
+ return REDIS_OK;
+ case '-':
+ if (c[1] != '\0') return REDIS_ERR;
+ *ex = 0;
+ *dest = shared.minstring;
+ incrRefCount(shared.minstring);
+ return REDIS_OK;
+ case '(':
+ *ex = 1;
+ *dest = createStringObject(c+1,sdslen(c)-1);
+ return REDIS_OK;
+ case '[':
+ *ex = 0;
+ *dest = createStringObject(c+1,sdslen(c)-1);
+ return REDIS_OK;
+ default:
+ return REDIS_ERR;
+ }
+}
+
+/* Populate the rangespec according to the objects min and max. */
+static int zslParseLexRange(robj *min, robj *max, zlexrangespec *spec) {
+ if (min->encoding == REDIS_ENCODING_INT ||
+ max->encoding == REDIS_ENCODING_INT) return REDIS_ERR;
+
+ spec->min = spec->max = NULL;
+ if (zslParseLexRangeItem(min, &spec->min, &spec->minex) == REDIS_ERR ||
+ zslParseLexRangeItem(max, &spec->max, &spec->maxex) == REDIS_ERR) {
+ if (spec->min) decrRefCount(spec->min);
+ if (spec->max) decrRefCount(spec->max);
+ return REDIS_ERR;
+ } else {
+ return REDIS_OK;
+ }
+}
+
+/* This is just a wrapper to compareStringObjects() that is able to
+ * handle shared.minstring and shared.maxstring as the equivalent of
+ * -inf and +inf for strings */
+int compareStringObjectsForLexRange(robj *a, robj *b) {
+ if (a == b) return 0; /* This makes sure that we handle inf,inf and
+ -inf,-inf ASAP. One special case less. */
+ if (a == shared.minstring || b == shared.maxstring) return -1;
+ if (a == shared.maxstring || b == shared.minstring) return 1;
+ return compareStringObjects(a,b);
+}
+
+static int zslLexValueGteMin(robj *value, zlexrangespec *spec) {
+ return spec->minex ?
+ (compareStringObjectsForLexRange(value,spec->min) > 0) :
+ (compareStringObjectsForLexRange(value,spec->min) >= 0);
+}
+
+static int zslLexValueLteMax(robj *value, zlexrangespec *spec) {
+ return spec->maxex ?
+ (compareStringObjectsForLexRange(value,spec->max) < 0) :
+ (compareStringObjectsForLexRange(value,spec->max) <= 0);
+}
+
+/* Returns if there is a part of the zset is in the lex range. */
+int zslIsInLexRange(zskiplist *zsl, zlexrangespec *range) {
+ zskiplistNode *x;
+
+ /* Test for ranges that will always be empty. */
+ if (compareStringObjectsForLexRange(range->min,range->max) > 1 ||
+ (compareStringObjects(range->min,range->max) == 0 &&
+ (range->minex || range->maxex)))
+ return 0;
+ x = zsl->tail;
+ if (x == NULL || !zslLexValueGteMin(x->obj,range))
+ return 0;
+ x = zsl->header->level[0].forward;
+ if (x == NULL || !zslLexValueLteMax(x->obj,range))
+ return 0;
+ return 1;
+}
+
+/* Find the first node that is contained in the specified lex range.
+ * Returns NULL when no element is contained in the range. */
+zskiplistNode *zslFirstInLexRange(zskiplist *zsl, zlexrangespec range) {
+ zskiplistNode *x;
+ int i;
+
+ /* If everything is out of range, return early. */
+ if (!zslIsInLexRange(zsl,&range)) return NULL;
+
+ x = zsl->header;
+ for (i = zsl->level-1; i >= 0; i--) {
+ /* Go forward while *OUT* of range. */
+ while (x->level[i].forward &&
+ !zslLexValueGteMin(x->level[i].forward->obj,&range))
+ x = x->level[i].forward;
+ }
+
+ /* This is an inner range, so the next node cannot be NULL. */
+ x = x->level[0].forward;
+ redisAssert(x != NULL);
+
+ /* Check if score <= max. */
+ if (!zslLexValueLteMax(x->obj,&range)) return NULL;
+ return x;
+}
+
+/* Find the last node that is contained in the specified range.
+ * Returns NULL when no element is contained in the range. */
+zskiplistNode *zslLastInLexRange(zskiplist *zsl, zlexrangespec range) {
+ zskiplistNode *x;
+ int i;
+
+ /* If everything is out of range, return early. */
+ if (!zslIsInLexRange(zsl,&range)) return NULL;
+
+ x = zsl->header;
+ for (i = zsl->level-1; i >= 0; i--) {
+ /* Go forward while *IN* range. */
+ while (x->level[i].forward &&
+ zslLexValueLteMax(x->level[i].forward->obj,&range))
+ x = x->level[i].forward;
+ }
+
+ /* This is an inner range, so this node cannot be NULL. */
+ redisAssert(x != NULL);
+
+ /* Check if score >= min. */
+ if (!zslLexValueGteMin(x->obj,&range)) return NULL;
+ return x;
+}
+
/*-----------------------------------------------------------------------------
* Ziplist-backed sorted set API
*----------------------------------------------------------------------------*/
@@ -456,6 +608,24 @@ double zzlGetScore(unsigned char *sptr) {
return score;
}
+/* Return a ziplist element as a Redis string object.
+ * This simple abstraction can be used to simplifies some code at the
+ * cost of some performance. */
+robj *ziplistGetObject(unsigned char *sptr) {
+ unsigned char *vstr;
+ unsigned int vlen;
+ long long vlong;
+
+ redisAssert(sptr != NULL);
+ redisAssert(ziplistGet(sptr,&vstr,&vlen,&vlong));
+
+ if (vstr) {
+ return createStringObject((char*)vstr,vlen);
+ } else {
+ return createStringObjectFromLongLong(vlong);
+ }
+}
+
/* Compare element in sorted set with given element. */
int zzlCompareElements(unsigned char *eptr, unsigned char *cstr, unsigned int clen) {
unsigned char *vstr;
@@ -606,6 +776,97 @@ unsigned char *zzlLastInRange(unsigned char *zl, zrangespec range) {
return NULL;
}
+static int zzlLexValueGteMin(unsigned char *p, zlexrangespec *spec) {
+ robj *value = ziplistGetObject(p);
+ int res = zslLexValueGteMin(value,spec);
+ decrRefCount(value);
+ return res;
+}
+
+static int zzlLexValueLteMax(unsigned char *p, zlexrangespec *spec) {
+ robj *value = ziplistGetObject(p);
+ int res = zslLexValueLteMax(value,spec);
+ decrRefCount(value);
+ return res;
+}
+
+/* Returns if there is a part of the zset is in range. Should only be used
+ * internally by zzlFirstInRange and zzlLastInRange. */
+int zzlIsInLexRange(unsigned char *zl, zlexrangespec *range) {
+ unsigned char *p;
+
+ /* Test for ranges that will always be empty. */
+ if (compareStringObjectsForLexRange(range->min,range->max) > 1 ||
+ (compareStringObjects(range->min,range->max) == 0 &&
+ (range->minex || range->maxex)))
+ return 0;
+
+ p = ziplistIndex(zl,-2); /* Last element. */
+ if (p == NULL) return 0;
+ if (!zzlLexValueGteMin(p,range))
+ return 0;
+
+ p = ziplistIndex(zl,0); /* First element. */
+ redisAssert(p != NULL);
+ if (!zzlLexValueLteMax(p,range))
+ return 0;
+
+ return 1;
+}
+
+/* Find pointer to the first element contained in the specified lex range.
+ * Returns NULL when no element is contained in the range. */
+unsigned char *zzlFirstInLexRange(unsigned char *zl, zlexrangespec range) {
+ unsigned char *eptr = ziplistIndex(zl,0), *sptr;
+
+ /* If everything is out of range, return early. */
+ if (!zzlIsInLexRange(zl,&range)) return NULL;
+
+ while (eptr != NULL) {
+ if (zzlLexValueGteMin(eptr,&range)) {
+ /* Check if score <= max. */
+ if (zzlLexValueLteMax(eptr,&range))
+ return eptr;
+ return NULL;
+ }
+
+ /* Move to next element. */
+ sptr = ziplistNext(zl,eptr); /* This element score. Skip it. */
+ redisAssert(sptr != NULL);
+ eptr = ziplistNext(zl,sptr); /* Next element. */
+ }
+
+ return NULL;
+}
+
+/* Find pointer to the last element contained in the specified lex range.
+ * Returns NULL when no element is contained in the range. */
+unsigned char *zzlLastInLexRange(unsigned char *zl, zlexrangespec range) {
+ unsigned char *eptr = ziplistIndex(zl,-2), *sptr;
+
+ /* If everything is out of range, return early. */
+ if (!zzlIsInLexRange(zl,&range)) return NULL;
+
+ while (eptr != NULL) {
+ if (zzlLexValueLteMax(eptr,&range)) {
+ /* Check if score >= min. */
+ if (zzlLexValueGteMin(eptr,&range))
+ return eptr;
+ return NULL;
+ }
+
+ /* Move to previous element by moving to the score of previous element.
+ * When this returns NULL, we know there also is no element. */
+ sptr = ziplistPrev(zl,eptr);
+ if (sptr != NULL)
+ redisAssert((eptr = ziplistPrev(zl,sptr)) != NULL);
+ else
+ eptr = NULL;
+ }
+
+ return NULL;
+}
+
unsigned char *zzlFind(unsigned char *zl, robj *ele, double *score) {
unsigned char *eptr = ziplistIndex(zl,0), *sptr;
@@ -2098,6 +2359,183 @@ void zcountCommand(redisClient *c) {
addReplyLongLong(c, count);
}
+/* This command implements ZRANGEBYLEX, ZREVRANGEBYLEX. */
+void genericZrangebylexCommand(redisClient *c, int reverse) {
+ zlexrangespec range;
+ robj *key = c->argv[1];
+ robj *zobj;
+ long offset = 0, limit = -1;
+ unsigned long rangelen = 0;
+ void *replylen = NULL;
+ int minidx, maxidx;
+
+ /* Parse the range arguments. */
+ if (reverse) {
+ /* Range is given as [max,min] */
+ maxidx = 2; minidx = 3;
+ } else {
+ /* Range is given as [min,max] */
+ minidx = 2; maxidx = 3;
+ }
+
+ if (zslParseLexRange(c->argv[minidx],c->argv[maxidx],&range) != REDIS_OK) {
+ addReplyError(c,"min or max not valid string range item");
+ return;
+ }
+
+ /* Parse optional extra arguments. Note that ZCOUNT will exactly have
+ * 4 arguments, so we'll never enter the following code path. */
+ if (c->argc > 4) {
+ int remaining = c->argc - 4;
+ int pos = 4;
+
+ while (remaining) {
+ if (remaining >= 3 && !strcasecmp(c->argv[pos]->ptr,"limit")) {
+ if ((getLongFromObjectOrReply(c, c->argv[pos+1], &offset, NULL) != REDIS_OK) ||
+ (getLongFromObjectOrReply(c, c->argv[pos+2], &limit, NULL) != REDIS_OK)) return;
+ pos += 3; remaining -= 3;
+ } else {
+ addReply(c,shared.syntaxerr);
+ return;
+ }
+ }
+ }
+
+ /* Ok, lookup the key and get the range */
+ if ((zobj = lookupKeyReadOrReply(c,key,shared.emptymultibulk)) == NULL ||
+ checkType(c,zobj,REDIS_ZSET)) return;
+
+ if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
+ unsigned char *zl = zobj->ptr;
+ unsigned char *eptr, *sptr;
+ unsigned char *vstr;
+ unsigned int vlen;
+ long long vlong;
+
+ /* If reversed, get the last node in range as starting point. */
+ if (reverse) {
+ eptr = zzlLastInLexRange(zl,range);
+ } else {
+ eptr = zzlFirstInLexRange(zl,range);
+ }
+
+ /* No "first" element in the specified interval. */
+ if (eptr == NULL) {
+ addReply(c, shared.emptymultibulk);
+ return;
+ }
+
+ /* Get score pointer for the first element. */
+ redisAssertWithInfo(c,zobj,eptr != NULL);
+ sptr = ziplistNext(zl,eptr);
+
+ /* We don't know in advance how many matching elements there are in the
+ * list, so we push this object that will represent the multi-bulk
+ * length in the output buffer, and will "fix" it later */
+ replylen = addDeferredMultiBulkLength(c);
+
+ /* If there is an offset, just traverse the number of elements without
+ * checking the score because that is done in the next loop. */
+ while (eptr && offset--) {
+ if (reverse) {
+ zzlPrev(zl,&eptr,&sptr);
+ } else {
+ zzlNext(zl,&eptr,&sptr);
+ }
+ }
+
+ while (eptr && limit--) {
+ /* Abort when the node is no longer in range. */
+ if (reverse) {
+ if (!zzlLexValueGteMin(eptr,&range)) break;
+ } else {
+ if (!zzlLexValueLteMax(eptr,&range)) break;
+ }
+
+ /* We know the element exists, so ziplistGet should always
+ * succeed. */
+ redisAssertWithInfo(c,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong));
+
+ rangelen++;
+ if (vstr == NULL) {
+ addReplyBulkLongLong(c,vlong);
+ } else {
+ addReplyBulkCBuffer(c,vstr,vlen);
+ }
+
+ /* Move to next node */
+ if (reverse) {
+ zzlPrev(zl,&eptr,&sptr);
+ } else {
+ zzlNext(zl,&eptr,&sptr);
+ }
+ }
+ } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
+ zset *zs = zobj->ptr;
+ zskiplist *zsl = zs->zsl;
+ zskiplistNode *ln;
+
+ /* If reversed, get the last node in range as starting point. */
+ if (reverse) {
+ ln = zslLastInLexRange(zsl,range);
+ } else {
+ ln = zslFirstInLexRange(zsl,range);
+ }
+
+ /* No "first" element in the specified interval. */
+ if (ln == NULL) {
+ addReply(c, shared.emptymultibulk);
+ return;
+ }
+
+ /* We don't know in advance how many matching elements there are in the
+ * list, so we push this object that will represent the multi-bulk
+ * length in the output buffer, and will "fix" it later */
+ replylen = addDeferredMultiBulkLength(c);
+
+ /* If there is an offset, just traverse the number of elements without
+ * checking the score because that is done in the next loop. */
+ while (ln && offset--) {
+ if (reverse) {
+ ln = ln->backward;
+ } else {
+ ln = ln->level[0].forward;
+ }
+ }
+
+ while (ln && limit--) {
+ /* Abort when the node is no longer in range. */
+ if (reverse) {
+ if (!zslLexValueGteMin(ln->obj,&range)) break;
+ } else {
+ if (!zslLexValueLteMax(ln->obj,&range)) break;
+ }
+
+ rangelen++;
+ addReplyBulk(c,ln->obj);
+
+ /* Move to next node */
+ if (reverse) {
+ ln = ln->backward;
+ } else {
+ ln = ln->level[0].forward;
+ }
+ }
+ } else {
+ redisPanic("Unknown sorted set encoding");
+ }
+
+ setDeferredMultiBulkLength(c, replylen, rangelen);
+}
+
+void zrangebylexCommand(redisClient *c) {
+ genericZrangebylexCommand(c,0);
+}
+
+void zrevrangebylexCommand(redisClient *c) {
+ genericZrangebylexCommand(c,1);
+}
+
void zcardCommand(redisClient *c) {
robj *key = c->argv[1];
robj *zobj;