diff options
Diffstat (limited to 'src/t_zset.c')
-rw-r--r-- | src/t_zset.c | 144 |
1 files changed, 144 insertions, 0 deletions
diff --git a/src/t_zset.c b/src/t_zset.c index f7f4c6eb2..2b11c7d37 100644 --- a/src/t_zset.c +++ b/src/t_zset.c @@ -3068,3 +3068,147 @@ void zscanCommand(client *c) { checkType(c,o,OBJ_ZSET)) return; scanGenericCommand(c,o,cursor); } + +/* This command implements the generic zpop operation, used by: + * ZPOP, ZREVPOP, BZPOP and BZREVPOP */ +void genericZpopCommand(client *c, robj **keyv, int keyc, int reverse) { + int idx; + robj *key; + robj *zobj; + sds ele; + double score; + char *events[2] = {"zpop","zrevpop"}; + + // Check type and break on the first error, otherwise identify candidate + idx = 0; + while (idx < keyc) { + key = keyv[idx++]; + zobj = lookupKeyWrite(c->db,key); + if (!zobj) continue; + if (checkType(c,zobj,OBJ_ZSET)) return; + break; + } + + // No candidate for zpopping, return empty + if (!zobj) { + addReply(c,shared.emptymultibulk); + return; + } + + if (zobj->encoding == OBJ_ENCODING_ZIPLIST) { + unsigned char *zl = zobj->ptr; + unsigned char *eptr, *sptr; + unsigned char *vstr; + unsigned int vlen; + long long vlong; + + // Get the first or last element in the sorted set + eptr = ziplistIndex(zl,reverse ? -2 : 0); + serverAssertWithInfo(c,zobj,eptr != NULL); + + // There must be an element in the sorted set + serverAssertWithInfo(c,zobj,eptr != NULL); + serverAssertWithInfo(c,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong)); + if (vstr == NULL) + ele = sdsfromlonglong(vlong); + else + ele = sdsnewlen(vstr,vlen); + + // Get the score + sptr = ziplistNext(zl,eptr); + serverAssertWithInfo(c,zobj,sptr != NULL); + score = zzlGetScore(sptr); + } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) { + zset *zs = zobj->ptr; + zskiplist *zsl = zs->zsl; + zskiplistNode *ln; + + // Get the first or last element in the sorted set + ln = (reverse ? zsl->tail : zsl->header->level[0].forward); + + // There must be an element in the sorted set + serverAssertWithInfo(c,zobj,ln != NULL); + ele = sdsdup(ln->ele); + score = ln->score; + } else { + serverPanic("Unknown sorted set encoding"); + } + + // Remove the element + serverAssertWithInfo(c,zobj,zsetDel(zobj,ele)); + server.dirty++; + signalModifiedKey(c->db,key); + notifyKeyspaceEvent(NOTIFY_ZSET,events[reverse],key,c->db->id); + + // Remove the key, if indeed needed + if (zsetLength(zobj) == 0) { + dbDelete(c->db,key); + notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key,c->db->id); + } + + addReplyMultiBulkLen(c,3); + addReplyBulk(c,key); + addReplyDouble(c,score); + addReplyBulkCBuffer(c,ele,sdslen(ele)); + sdsfree(ele); +} + +// ZPOP key [key ...] +void zpopCommand(client *c) { + genericZpopCommand(c,&c->argv[1],c->argc-1,0); +} + +// ZREVPOP key [key ...] +void zrevpopCommand(client *c) { + genericZpopCommand(c,&c->argv[1],c->argc-1,1); +} + +/* Blocking Z[REV]POP */ +void blockingGenericZpopCommand(client *c, int reverse) { + robj *o; + mstime_t timeout; + int j; + + if (getTimeoutFromObjectOrReply(c,c->argv[c->argc-1],&timeout,UNIT_SECONDS) + != C_OK) return; + + for (j = 1; j < c->argc-1; j++) { + o = lookupKeyWrite(c->db,c->argv[j]); + if (o != NULL) { + if (o->type != OBJ_ZSET) { + addReply(c,shared.wrongtypeerr); + return; + } else { + if (zsetLength(o) != 0) { + /* Non empty zset, this is like a normal Z[REV]POP. */ + genericZpopCommand(c,&c->argv[j],1,reverse); + /* Replicate it as an Z[REV]POP instead of BZ[REV]POP. */ + rewriteClientCommandVector(c,2, + reverse ? shared.zrevpop : shared.zpop, + c->argv[j]); + return; + } + } + } + } + + /* If we are inside a MULTI/EXEC and the zset is empty the only thing + * we can do is treating it as a timeout (even with timeout 0). */ + if (c->flags & CLIENT_MULTI) { + addReply(c,shared.nullmultibulk); + return; + } + + /* If the keys do not exist we must block */ + blockForKeys(c,BLOCKED_ZSET,c->argv + 1,c->argc - 2,timeout,NULL,NULL); +} + +// BZPOP key [key ...] timeout +void bzpopCommand(client *c) { + blockingGenericZpopCommand(c,0); +} + +// BZREVPOP key [key ...] timeout +void bzrevpopCommand(client *c) { + blockingGenericZpopCommand(c,1); +} |