diff options
author | antirez <antirez@gmail.com> | 2018-05-11 18:00:32 +0200 |
---|---|---|
committer | antirez <antirez@gmail.com> | 2018-05-11 18:00:32 +0200 |
commit | 56bbab238a0fb0c14e1ca0cdb0ebadc8fb8dc945 (patch) | |
tree | 029dc8415d4d9ebbb5112d2931601b366f81e196 /src/t_zset.c | |
parent | 6efb6c1e069b414305a92cf57cee31d13a84a44a (diff) | |
download | redis-56bbab238a0fb0c14e1ca0cdb0ebadc8fb8dc945.tar.gz |
ZPOP: change sync ZPOP to have a count argument instead of N keys.
Usually blocking operations make a lot of sense with multiple keys so
that we can listen to multiple queues (or whatever the app models) with
a single connection. However in the synchronous case it is more useful
to be able to ask for N elements. This is a change that I also wanted to
perform soon or later in the blocking list variant, but here it is more
natural since there is no reply type difference.
Diffstat (limited to 'src/t_zset.c')
-rw-r--r-- | src/t_zset.c | 147 |
1 files changed, 93 insertions, 54 deletions
diff --git a/src/t_zset.c b/src/t_zset.c index b58e58bc3..972412046 100644 --- a/src/t_zset.c +++ b/src/t_zset.c @@ -3070,13 +3070,28 @@ void zscanCommand(client *c) { } /* This command implements the generic zpop operation, used by: - * ZPOPMIN, ZPOPMAX, BZPOPMIN and BZPOPMAX */ -void genericZpopCommand(client *c, robj **keyv, int keyc, int where) { + * ZPOPMIN, ZPOPMAX, BZPOPMIN and BZPOPMAX. This function is also used + * inside blocked.c in the unblocking stage of BZPOPMIN and BZPOPMAX. + * + * If 'emitkey' is true also the key name is emitted, useful for the blocking + * behavior of BZPOP[MIN|MAX], since we can block into multiple keys. + * + * The synchronous version instead does not need to emit the key, but may + * use the 'count' argument to return multiple items if available. */ +void genericZpopCommand(client *c, robj **keyv, int keyc, int where, int emitkey, robj *countarg) { int idx; robj *key; robj *zobj; sds ele; double score; + long count = 1; + + /* If a count argument as passed, parse it or return an error. */ + if (countarg) { + if (getLongFromObjectOrReply(c,countarg,&count,NULL) != C_OK) + return; + if (count < 0) count = 1; + } /* Check type and break on the first error, otherwise identify candidate. */ idx = 0; @@ -3094,70 +3109,94 @@ void genericZpopCommand(client *c, robj **keyv, int keyc, int where) { return; } - if (zobj->encoding == OBJ_ENCODING_ZIPLIST) { - unsigned char *zl = zobj->ptr; - unsigned char *eptr, *sptr; - unsigned char *vstr; - unsigned int vlen; - long long vlong; + void *arraylen_ptr = addDeferredMultiBulkLength(c); + long arraylen = 0; - /* Get the first or last element in the sorted set. */ - eptr = ziplistIndex(zl,where == ZSET_MAX ? -2 : 0); - serverAssertWithInfo(c,zobj,eptr != NULL); - serverAssertWithInfo(c,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong)); - if (vstr == NULL) - ele = sdsfromlonglong(vlong); - else - ele = sdsnewlen(vstr,vlen); + /* We emit the key only for the blocking variant. */ + if (emitkey) addReplyBulk(c,key); - /* Get the score. */ - sptr = ziplistNext(zl,eptr); - serverAssertWithInfo(c,zobj,sptr != NULL); - score = zzlGetScore(sptr); - } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) { - zset *zs = zobj->ptr; - zskiplist *zsl = zs->zsl; - zskiplistNode *zln; + /* Remove the element. */ + do { + if (zobj->encoding == OBJ_ENCODING_ZIPLIST) { + unsigned char *zl = zobj->ptr; + unsigned char *eptr, *sptr; + unsigned char *vstr; + unsigned int vlen; + long long vlong; - // Get the first or last element in the sorted set - zln = (where == ZSET_MAX ? zsl->tail : zsl->header->level[0].forward); + /* Get the first or last element in the sorted set. */ + eptr = ziplistIndex(zl,where == ZSET_MAX ? -2 : 0); + serverAssertWithInfo(c,zobj,eptr != NULL); + serverAssertWithInfo(c,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong)); + if (vstr == NULL) + ele = sdsfromlonglong(vlong); + else + ele = sdsnewlen(vstr,vlen); - // There must be an element in the sorted set - serverAssertWithInfo(c,zobj,zln != NULL); - ele = sdsdup(zln->ele); - score = zln->score; - } else { - serverPanic("Unknown sorted set encoding"); - } + /* Get the score. */ + sptr = ziplistNext(zl,eptr); + serverAssertWithInfo(c,zobj,sptr != NULL); + score = zzlGetScore(sptr); + } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) { + zset *zs = zobj->ptr; + zskiplist *zsl = zs->zsl; + zskiplistNode *zln; + + /* Get the first or last element in the sorted set. */ + zln = (where == ZSET_MAX ? zsl->tail : + zsl->header->level[0].forward); + + /* There must be an element in the sorted set. */ + serverAssertWithInfo(c,zobj,zln != NULL); + ele = sdsdup(zln->ele); + score = zln->score; + } else { + serverPanic("Unknown sorted set encoding"); + } - // Remove the element - serverAssertWithInfo(c,zobj,zsetDel(zobj,ele)); - server.dirty++; - signalModifiedKey(c->db,key); - char *events[2] = {"zpopmin","zpopmax"}; - notifyKeyspaceEvent(NOTIFY_ZSET,events[where],key,c->db->id); + serverAssertWithInfo(c,zobj,zsetDel(zobj,ele)); + server.dirty++; - // Remove the key, if indeed needed - if (zsetLength(zobj) == 0) { - dbDelete(c->db,key); - notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key,c->db->id); - } + if (arraylen == 0) { /* Do this only for the first iteration. */ + char *events[2] = {"zpopmin","zpopmax"}; + notifyKeyspaceEvent(NOTIFY_ZSET,events[where],key,c->db->id); + signalModifiedKey(c->db,key); + } - addReplyMultiBulkLen(c,3); - addReplyBulk(c,key); - addReplyDouble(c,score); - addReplyBulkCBuffer(c,ele,sdslen(ele)); - sdsfree(ele); + addReplyDouble(c,score); + addReplyBulkCBuffer(c,ele,sdslen(ele)); + sdsfree(ele); + arraylen += 2; + + /* Remove the key, if indeed needed. */ + if (zsetLength(zobj) == 0) { + dbDelete(c->db,key); + notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key,c->db->id); + break; + } + } while(--count); + + setDeferredMultiBulkLength(c,arraylen_ptr,arraylen + (emitkey != 0)); } -// ZPOPMIN key [key ...] +/* ZPOPMIN key [<count>] */ void zpopminCommand(client *c) { - genericZpopCommand(c,&c->argv[1],c->argc-1,ZSET_MIN); + if (c->argc > 3) { + addReply(c,shared.syntaxerr); + return; + } + genericZpopCommand(c,&c->argv[1],c->argc-1,ZSET_MIN,0, + c->argc == 3 ? c->argv[2] : NULL); } -// ZMAXPOP key [key ...] +/* ZMAXPOP key [<count>] */ void zpopmaxCommand(client *c) { - genericZpopCommand(c,&c->argv[1],c->argc-1,ZSET_MAX); + if (c->argc > 3) { + addReply(c,shared.syntaxerr); + return; + } + genericZpopCommand(c,&c->argv[1],c->argc-1,ZSET_MAX,0, + c->argc == 3 ? c->argv[2] : NULL); } /* BZPOPMIN / BZPOPMAX actual implementation. */ @@ -3178,7 +3217,7 @@ void blockingGenericZpopCommand(client *c, int where) { } else { if (zsetLength(o) != 0) { /* Non empty zset, this is like a normal Z[REV]POP. */ - genericZpopCommand(c,&c->argv[j],1,where); + genericZpopCommand(c,&c->argv[j],1,where,1,NULL); /* Replicate it as an Z[REV]POP instead of BZ[REV]POP. */ rewriteClientCommandVector(c,2, where == ZSET_MAX ? shared.zpopmax : shared.zpopmin, |