summaryrefslogtreecommitdiff
path: root/src/t_zset.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/t_zset.c')
-rw-r--r--src/t_zset.c144
1 files changed, 144 insertions, 0 deletions
diff --git a/src/t_zset.c b/src/t_zset.c
index f7f4c6eb2..2b11c7d37 100644
--- a/src/t_zset.c
+++ b/src/t_zset.c
@@ -3068,3 +3068,147 @@ void zscanCommand(client *c) {
checkType(c,o,OBJ_ZSET)) return;
scanGenericCommand(c,o,cursor);
}
+
+/* This command implements the generic zpop operation, used by:
+ * ZPOP, ZREVPOP, BZPOP and BZREVPOP */
+void genericZpopCommand(client *c, robj **keyv, int keyc, int reverse) {
+ int idx;
+ robj *key;
+ robj *zobj;
+ sds ele;
+ double score;
+ char *events[2] = {"zpop","zrevpop"};
+
+ // Check type and break on the first error, otherwise identify candidate
+ idx = 0;
+ while (idx < keyc) {
+ key = keyv[idx++];
+ zobj = lookupKeyWrite(c->db,key);
+ if (!zobj) continue;
+ if (checkType(c,zobj,OBJ_ZSET)) return;
+ break;
+ }
+
+ // No candidate for zpopping, return empty
+ if (!zobj) {
+ addReply(c,shared.emptymultibulk);
+ return;
+ }
+
+ if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
+ unsigned char *zl = zobj->ptr;
+ unsigned char *eptr, *sptr;
+ unsigned char *vstr;
+ unsigned int vlen;
+ long long vlong;
+
+ // Get the first or last element in the sorted set
+ eptr = ziplistIndex(zl,reverse ? -2 : 0);
+ serverAssertWithInfo(c,zobj,eptr != NULL);
+
+ // There must be an element in the sorted set
+ serverAssertWithInfo(c,zobj,eptr != NULL);
+ serverAssertWithInfo(c,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong));
+ if (vstr == NULL)
+ ele = sdsfromlonglong(vlong);
+ else
+ ele = sdsnewlen(vstr,vlen);
+
+ // Get the score
+ sptr = ziplistNext(zl,eptr);
+ serverAssertWithInfo(c,zobj,sptr != NULL);
+ score = zzlGetScore(sptr);
+ } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
+ zset *zs = zobj->ptr;
+ zskiplist *zsl = zs->zsl;
+ zskiplistNode *ln;
+
+ // Get the first or last element in the sorted set
+ ln = (reverse ? zsl->tail : zsl->header->level[0].forward);
+
+ // There must be an element in the sorted set
+ serverAssertWithInfo(c,zobj,ln != NULL);
+ ele = sdsdup(ln->ele);
+ score = ln->score;
+ } else {
+ serverPanic("Unknown sorted set encoding");
+ }
+
+ // Remove the element
+ serverAssertWithInfo(c,zobj,zsetDel(zobj,ele));
+ server.dirty++;
+ signalModifiedKey(c->db,key);
+ notifyKeyspaceEvent(NOTIFY_ZSET,events[reverse],key,c->db->id);
+
+ // Remove the key, if indeed needed
+ if (zsetLength(zobj) == 0) {
+ dbDelete(c->db,key);
+ notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key,c->db->id);
+ }
+
+ addReplyMultiBulkLen(c,3);
+ addReplyBulk(c,key);
+ addReplyDouble(c,score);
+ addReplyBulkCBuffer(c,ele,sdslen(ele));
+ sdsfree(ele);
+}
+
+// ZPOP key [key ...]
+void zpopCommand(client *c) {
+ genericZpopCommand(c,&c->argv[1],c->argc-1,0);
+}
+
+// ZREVPOP key [key ...]
+void zrevpopCommand(client *c) {
+ genericZpopCommand(c,&c->argv[1],c->argc-1,1);
+}
+
+/* Blocking Z[REV]POP */
+void blockingGenericZpopCommand(client *c, int reverse) {
+ robj *o;
+ mstime_t timeout;
+ int j;
+
+ if (getTimeoutFromObjectOrReply(c,c->argv[c->argc-1],&timeout,UNIT_SECONDS)
+ != C_OK) return;
+
+ for (j = 1; j < c->argc-1; j++) {
+ o = lookupKeyWrite(c->db,c->argv[j]);
+ if (o != NULL) {
+ if (o->type != OBJ_ZSET) {
+ addReply(c,shared.wrongtypeerr);
+ return;
+ } else {
+ if (zsetLength(o) != 0) {
+ /* Non empty zset, this is like a normal Z[REV]POP. */
+ genericZpopCommand(c,&c->argv[j],1,reverse);
+ /* Replicate it as an Z[REV]POP instead of BZ[REV]POP. */
+ rewriteClientCommandVector(c,2,
+ reverse ? shared.zrevpop : shared.zpop,
+ c->argv[j]);
+ return;
+ }
+ }
+ }
+ }
+
+ /* If we are inside a MULTI/EXEC and the zset is empty the only thing
+ * we can do is treating it as a timeout (even with timeout 0). */
+ if (c->flags & CLIENT_MULTI) {
+ addReply(c,shared.nullmultibulk);
+ return;
+ }
+
+ /* If the keys do not exist we must block */
+ blockForKeys(c,BLOCKED_ZSET,c->argv + 1,c->argc - 2,timeout,NULL,NULL);
+}
+
+// BZPOP key [key ...] timeout
+void bzpopCommand(client *c) {
+ blockingGenericZpopCommand(c,0);
+}
+
+// BZREVPOP key [key ...] timeout
+void bzrevpopCommand(client *c) {
+ blockingGenericZpopCommand(c,1);
+}