summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorantirez <antirez@gmail.com>2018-05-11 18:00:32 +0200
committerantirez <antirez@gmail.com>2018-05-11 18:00:32 +0200
commit56bbab238a0fb0c14e1ca0cdb0ebadc8fb8dc945 (patch)
tree029dc8415d4d9ebbb5112d2931601b366f81e196
parent6efb6c1e069b414305a92cf57cee31d13a84a44a (diff)
downloadredis-56bbab238a0fb0c14e1ca0cdb0ebadc8fb8dc945.tar.gz
ZPOP: change sync ZPOP to have a count argument instead of N keys.
Usually blocking operations make a lot of sense with multiple keys so that we can listen to multiple queues (or whatever the app models) with a single connection. However in the synchronous case it is more useful to be able to ask for N elements. This is a change that I also wanted to perform soon or later in the blocking list variant, but here it is more natural since there is no reply type difference.
-rw-r--r--src/blocked.c2
-rw-r--r--src/server.h2
-rw-r--r--src/t_zset.c147
3 files changed, 95 insertions, 56 deletions
diff --git a/src/blocked.c b/src/blocked.c
index f9fd94ea1..f4b47bb82 100644
--- a/src/blocked.c
+++ b/src/blocked.c
@@ -331,7 +331,7 @@ void handleClientsBlockedOnKeys(void) {
receiver->lastcmd->proc == bzpopminCommand)
? ZSET_MIN : ZSET_MAX;
unblockClient(receiver);
- genericZpopCommand(receiver,&rl->key,1,where);
+ genericZpopCommand(receiver,&rl->key,1,where,1,NULL);
propagate(where == ZSET_MIN ?
server.zpopminCommand : server.zpopmaxCommand,
diff --git a/src/server.h b/src/server.h
index b5675b476..d9c512c5e 100644
--- a/src/server.h
+++ b/src/server.h
@@ -1632,7 +1632,7 @@ unsigned long zslGetRank(zskiplist *zsl, double score, sds o);
int zsetAdd(robj *zobj, double score, sds ele, int *flags, double *newscore);
long zsetRank(robj *zobj, sds ele, int reverse);
int zsetDel(robj *zobj, sds ele);
-void genericZpopCommand(client *c, robj **keyv, int keyc, int reverse);
+void genericZpopCommand(client *c, robj **keyv, int keyc, int where, int emitkey, robj *countarg);
sds ziplistGetObject(unsigned char *sptr);
int zslValueGteMin(double value, zrangespec *spec);
int zslValueLteMax(double value, zrangespec *spec);
diff --git a/src/t_zset.c b/src/t_zset.c
index b58e58bc3..972412046 100644
--- a/src/t_zset.c
+++ b/src/t_zset.c
@@ -3070,13 +3070,28 @@ void zscanCommand(client *c) {
}
/* This command implements the generic zpop operation, used by:
- * ZPOPMIN, ZPOPMAX, BZPOPMIN and BZPOPMAX */
-void genericZpopCommand(client *c, robj **keyv, int keyc, int where) {
+ * ZPOPMIN, ZPOPMAX, BZPOPMIN and BZPOPMAX. This function is also used
+ * inside blocked.c in the unblocking stage of BZPOPMIN and BZPOPMAX.
+ *
+ * If 'emitkey' is true also the key name is emitted, useful for the blocking
+ * behavior of BZPOP[MIN|MAX], since we can block into multiple keys.
+ *
+ * The synchronous version instead does not need to emit the key, but may
+ * use the 'count' argument to return multiple items if available. */
+void genericZpopCommand(client *c, robj **keyv, int keyc, int where, int emitkey, robj *countarg) {
int idx;
robj *key;
robj *zobj;
sds ele;
double score;
+ long count = 1;
+
+ /* If a count argument as passed, parse it or return an error. */
+ if (countarg) {
+ if (getLongFromObjectOrReply(c,countarg,&count,NULL) != C_OK)
+ return;
+ if (count < 0) count = 1;
+ }
/* Check type and break on the first error, otherwise identify candidate. */
idx = 0;
@@ -3094,70 +3109,94 @@ void genericZpopCommand(client *c, robj **keyv, int keyc, int where) {
return;
}
- if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
- unsigned char *zl = zobj->ptr;
- unsigned char *eptr, *sptr;
- unsigned char *vstr;
- unsigned int vlen;
- long long vlong;
+ void *arraylen_ptr = addDeferredMultiBulkLength(c);
+ long arraylen = 0;
- /* Get the first or last element in the sorted set. */
- eptr = ziplistIndex(zl,where == ZSET_MAX ? -2 : 0);
- serverAssertWithInfo(c,zobj,eptr != NULL);
- serverAssertWithInfo(c,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong));
- if (vstr == NULL)
- ele = sdsfromlonglong(vlong);
- else
- ele = sdsnewlen(vstr,vlen);
+ /* We emit the key only for the blocking variant. */
+ if (emitkey) addReplyBulk(c,key);
- /* Get the score. */
- sptr = ziplistNext(zl,eptr);
- serverAssertWithInfo(c,zobj,sptr != NULL);
- score = zzlGetScore(sptr);
- } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
- zset *zs = zobj->ptr;
- zskiplist *zsl = zs->zsl;
- zskiplistNode *zln;
+ /* Remove the element. */
+ do {
+ if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
+ unsigned char *zl = zobj->ptr;
+ unsigned char *eptr, *sptr;
+ unsigned char *vstr;
+ unsigned int vlen;
+ long long vlong;
- // Get the first or last element in the sorted set
- zln = (where == ZSET_MAX ? zsl->tail : zsl->header->level[0].forward);
+ /* Get the first or last element in the sorted set. */
+ eptr = ziplistIndex(zl,where == ZSET_MAX ? -2 : 0);
+ serverAssertWithInfo(c,zobj,eptr != NULL);
+ serverAssertWithInfo(c,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong));
+ if (vstr == NULL)
+ ele = sdsfromlonglong(vlong);
+ else
+ ele = sdsnewlen(vstr,vlen);
- // There must be an element in the sorted set
- serverAssertWithInfo(c,zobj,zln != NULL);
- ele = sdsdup(zln->ele);
- score = zln->score;
- } else {
- serverPanic("Unknown sorted set encoding");
- }
+ /* Get the score. */
+ sptr = ziplistNext(zl,eptr);
+ serverAssertWithInfo(c,zobj,sptr != NULL);
+ score = zzlGetScore(sptr);
+ } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
+ zset *zs = zobj->ptr;
+ zskiplist *zsl = zs->zsl;
+ zskiplistNode *zln;
+
+ /* Get the first or last element in the sorted set. */
+ zln = (where == ZSET_MAX ? zsl->tail :
+ zsl->header->level[0].forward);
+
+ /* There must be an element in the sorted set. */
+ serverAssertWithInfo(c,zobj,zln != NULL);
+ ele = sdsdup(zln->ele);
+ score = zln->score;
+ } else {
+ serverPanic("Unknown sorted set encoding");
+ }
- // Remove the element
- serverAssertWithInfo(c,zobj,zsetDel(zobj,ele));
- server.dirty++;
- signalModifiedKey(c->db,key);
- char *events[2] = {"zpopmin","zpopmax"};
- notifyKeyspaceEvent(NOTIFY_ZSET,events[where],key,c->db->id);
+ serverAssertWithInfo(c,zobj,zsetDel(zobj,ele));
+ server.dirty++;
- // Remove the key, if indeed needed
- if (zsetLength(zobj) == 0) {
- dbDelete(c->db,key);
- notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key,c->db->id);
- }
+ if (arraylen == 0) { /* Do this only for the first iteration. */
+ char *events[2] = {"zpopmin","zpopmax"};
+ notifyKeyspaceEvent(NOTIFY_ZSET,events[where],key,c->db->id);
+ signalModifiedKey(c->db,key);
+ }
- addReplyMultiBulkLen(c,3);
- addReplyBulk(c,key);
- addReplyDouble(c,score);
- addReplyBulkCBuffer(c,ele,sdslen(ele));
- sdsfree(ele);
+ addReplyDouble(c,score);
+ addReplyBulkCBuffer(c,ele,sdslen(ele));
+ sdsfree(ele);
+ arraylen += 2;
+
+ /* Remove the key, if indeed needed. */
+ if (zsetLength(zobj) == 0) {
+ dbDelete(c->db,key);
+ notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key,c->db->id);
+ break;
+ }
+ } while(--count);
+
+ setDeferredMultiBulkLength(c,arraylen_ptr,arraylen + (emitkey != 0));
}
-// ZPOPMIN key [key ...]
+/* ZPOPMIN key [<count>] */
void zpopminCommand(client *c) {
- genericZpopCommand(c,&c->argv[1],c->argc-1,ZSET_MIN);
+ if (c->argc > 3) {
+ addReply(c,shared.syntaxerr);
+ return;
+ }
+ genericZpopCommand(c,&c->argv[1],c->argc-1,ZSET_MIN,0,
+ c->argc == 3 ? c->argv[2] : NULL);
}
-// ZMAXPOP key [key ...]
+/* ZMAXPOP key [<count>] */
void zpopmaxCommand(client *c) {
- genericZpopCommand(c,&c->argv[1],c->argc-1,ZSET_MAX);
+ if (c->argc > 3) {
+ addReply(c,shared.syntaxerr);
+ return;
+ }
+ genericZpopCommand(c,&c->argv[1],c->argc-1,ZSET_MAX,0,
+ c->argc == 3 ? c->argv[2] : NULL);
}
/* BZPOPMIN / BZPOPMAX actual implementation. */
@@ -3178,7 +3217,7 @@ void blockingGenericZpopCommand(client *c, int where) {
} else {
if (zsetLength(o) != 0) {
/* Non empty zset, this is like a normal Z[REV]POP. */
- genericZpopCommand(c,&c->argv[j],1,where);
+ genericZpopCommand(c,&c->argv[j],1,where,1,NULL);
/* Replicate it as an Z[REV]POP instead of BZ[REV]POP. */
rewriteClientCommandVector(c,2,
where == ZSET_MAX ? shared.zpopmax : shared.zpopmin,