diff options
author | Itamar Haber <itamar@redislabs.com> | 2020-12-25 21:49:24 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-12-25 21:49:24 +0200 |
commit | f44186e5755e943f39c1446ad9576ab97de5039d (patch) | |
tree | 15ebfaba6b54a41657c10c7c1b147465d6dddfb1 /src/t_list.c | |
parent | e18068d9d83e6c7298b5501ffabfdb7709984fc0 (diff) | |
download | redis-f44186e5755e943f39c1446ad9576ab97de5039d.tar.gz |
Adds count to L/RPOP (#8179)
Adds: `L/RPOP <key> [count]`
Implements no. 2 of the following strategies:
1. Loop on listTypePop - this would result in multiple calls for memory freeing and allocating (see https://github.com/redis/redis/pull/8179/commits/769167a079b0e110d28e4a8099dce1ecd45682b5)
2. Iterate the range to build the reply, then call quickListDelRange - this requires two iterations and **is the current choice**
3. Refactor quicklist to have a pop variant of quickListDelRange - probably optimal but more complex
Also:
* There's a historical check for NULL after calling listTypePop that was converted to an assert.
* This refactors common logic shared between LRANGE and the new form of LPOP/RPOP into addListRangeReply (adds test for b/w compat)
* Consequently, it may have made sense to have `LRANGE l -1 -2` and `LRANGE l 9 0` be legit and return a reverse reply. Due to historical reasons that would be, however, a breaking change.
* Added minimal comments to existing commands to adhere to the style, make core dev life easier and get commit karma, naturally.
Diffstat (limited to 'src/t_list.c')
-rw-r--r-- | src/t_list.c | 156 |
1 files changed, 112 insertions, 44 deletions
diff --git a/src/t_list.c b/src/t_list.c index c8323c612..b64b37a8a 100644 --- a/src/t_list.c +++ b/src/t_list.c @@ -206,7 +206,7 @@ robj *listTypeDup(robj *o) { lobj->encoding = OBJ_ENCODING_QUICKLIST; break; default: - serverPanic("Wrong encoding."); + serverPanic("Unknown list encoding"); break; } return lobj; @@ -216,6 +216,7 @@ robj *listTypeDup(robj *o) { * List Commands *----------------------------------------------------------------------------*/ +/* Implements LPUSH/RPUSH. */ void pushGenericCommand(client *c, int where) { int j, pushed = 0; robj *lobj = lookupKeyWrite(c->db,c->argv[1]); @@ -244,14 +245,17 @@ void pushGenericCommand(client *c, int where) { server.dirty += pushed; } +/* LPUSH <key> <element> [<element> ...] */ void lpushCommand(client *c) { pushGenericCommand(c,LIST_HEAD); } +/* RPUSH <key> <element> [<element> ...] */ void rpushCommand(client *c) { pushGenericCommand(c,LIST_TAIL); } +/* Implements LPUSHX/RPUSHX. */ void pushxGenericCommand(client *c, int where) { int j, pushed = 0; robj *subject; @@ -274,14 +278,17 @@ void pushxGenericCommand(client *c, int where) { server.dirty += pushed; } +/* LPUSHX <key> <element> [<element> ...] */ void lpushxCommand(client *c) { pushxGenericCommand(c,LIST_HEAD); } +/* RPUSH <key> <element> [<element> ...] */ void rpushxCommand(client *c) { pushxGenericCommand(c,LIST_TAIL); } +/* LINSERT <key> (BEFORE|AFTER) <pivot> <element> */ void linsertCommand(client *c) { int where; robj *subject; @@ -326,12 +333,14 @@ void linsertCommand(client *c) { addReplyLongLong(c,listTypeLength(subject)); } +/* LLEN <key> */ void llenCommand(client *c) { robj *o = lookupKeyReadOrReply(c,c->argv[1],shared.czero); if (o == NULL || checkType(c,o,OBJ_LIST)) return; addReplyLongLong(c,listTypeLength(o)); } +/* LINDEX <key> <index> */ void lindexCommand(client *c) { robj *o = lookupKeyReadOrReply(c,c->argv[1],shared.null[c->resp]); if (o == NULL || checkType(c,o,OBJ_LIST)) return; @@ -359,6 +368,7 @@ void lindexCommand(client *c) { } } +/* LSET <key> <index> <element> */ void lsetCommand(client *c) { robj *o = lookupKeyWriteOrReply(c,c->argv[1],shared.nokeyerr); if (o == NULL || checkType(c,o,OBJ_LIST)) return; @@ -385,7 +395,53 @@ void lsetCommand(client *c) { } } -void listElementsRemoved(client *c, robj *key, int where, robj *o) { +/* A helper for replying with a list's range between the inclusive start and end + * indexes as multi-bulk, with support for negative indexes. Note that start + * must be less than end or an empty array is returned. When the reverse + * argument is set to a non-zero value, the reply is reversed so that elements + * are returned from end to start. */ +void addListRangeReply(client *c, robj *o, long start, long end, int reverse) { + long rangelen, llen = listTypeLength(o); + + /* Convert negative indexes. */ + if (start < 0) start = llen+start; + if (end < 0) end = llen+end; + if (start < 0) start = 0; + + /* Invariant: start >= 0, so this test will be true when end < 0. + * The range is empty when start > end or start >= length. */ + if (start > end || start >= llen) { + addReply(c,shared.emptyarray); + return; + } + if (end >= llen) end = llen-1; + rangelen = (end-start)+1; + + /* Return the result in form of a multi-bulk reply */ + addReplyArrayLen(c,rangelen); + if (o->encoding == OBJ_ENCODING_QUICKLIST) { + int from = reverse ? end : start; + int direction = reverse ? LIST_HEAD : LIST_TAIL; + listTypeIterator *iter = listTypeInitIterator(o,from,direction); + + while(rangelen--) { + listTypeEntry entry; + listTypeNext(iter, &entry); + quicklistEntry *qe = &entry.entry; + if (qe->value) { + addReplyBulkCBuffer(c,qe->value,qe->sz); + } else { + addReplyBulkLongLong(c,qe->longval); + } + } + listTypeReleaseIterator(iter); + } else { + serverPanic("Unknown list encoding"); + } +} + +/* A housekeeping helper for list elements popping tasks. */ +void listElementsRemoved(client *c, robj *key, int where, robj *o, long count) { char *event = (where == LIST_HEAD) ? "lpop" : "rpop"; notifyKeyspaceEvent(NOTIFY_LIST, event, key, c->db->id); @@ -394,78 +450,84 @@ void listElementsRemoved(client *c, robj *key, int where, robj *o) { dbDelete(c->db, key); } signalModifiedKey(c, c->db, key); - server.dirty++; + server.dirty += count; } +/* Implements the generic list pop operation for LPOP/RPOP. + * The where argument specifies which end of the list is operated on. An + * optional count may be provided as the third argument of the client's + * command. */ void popGenericCommand(client *c, int where) { + long count = 0; + robj *value; + + if (c->argc > 3) { + addReplyErrorFormat(c,"wrong number of arguments for '%s' command", + c->cmd->name); + return; + } else if (c->argc == 3) { + /* Parse the optional count argument. */ + if (getPositiveLongFromObjectOrReply(c,c->argv[2],&count,NULL) != C_OK) + return; + if (count == 0) { + /* Fast exit path. */ + addReplyNullArray(c); + return; + } + } + robj *o = lookupKeyWriteOrReply(c, c->argv[1], shared.null[c->resp]); if (o == NULL || checkType(c, o, OBJ_LIST)) return; - robj *value = listTypePop(o, where); - if (value == NULL) { - addReplyNull(c); - } else { + if (!count) { + /* Pop a single element. This is POP's original behavior that replies + * with a bulk string. */ + value = listTypePop(o,where); + serverAssert(value != NULL); addReplyBulk(c,value); decrRefCount(value); - listElementsRemoved(c,c->argv[1],where,o); + listElementsRemoved(c,c->argv[1],where,o,1); + } else { + /* Pop a range of elements. An addition to the original POP command, + * which replies with a multi-bulk. */ + long llen = listTypeLength(o); + long rangelen = (count > llen) ? llen : count; + long rangestart = (where == LIST_HEAD) ? 0 : -rangelen; + long rangeend = (where == LIST_HEAD) ? rangelen - 1 : -1; + int reverse = (where == LIST_HEAD) ? 0 : 1; + + addListRangeReply(c,o,rangestart,rangeend,reverse); + quicklistDelRange(o->ptr,rangestart,rangelen); + listElementsRemoved(c,c->argv[1],where,o,rangelen); } } +/* LPOP <key> [count] */ void lpopCommand(client *c) { popGenericCommand(c,LIST_HEAD); } +/* RPOP <key> [count] */ void rpopCommand(client *c) { popGenericCommand(c,LIST_TAIL); } +/* LRANGE <key> <start> <stop> */ void lrangeCommand(client *c) { robj *o; - long start, end, llen, rangelen; + long start, end; if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != C_OK) || (getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != C_OK)) return; if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptyarray)) == NULL || checkType(c,o,OBJ_LIST)) return; - llen = listTypeLength(o); - - /* convert negative indexes */ - if (start < 0) start = llen+start; - if (end < 0) end = llen+end; - if (start < 0) start = 0; - - /* Invariant: start >= 0, so this test will be true when end < 0. - * The range is empty when start > end or start >= length. */ - if (start > end || start >= llen) { - addReply(c,shared.emptyarray); - return; - } - if (end >= llen) end = llen-1; - rangelen = (end-start)+1; - - /* Return the result in form of a multi-bulk reply */ - addReplyArrayLen(c,rangelen); - if (o->encoding == OBJ_ENCODING_QUICKLIST) { - listTypeIterator *iter = listTypeInitIterator(o, start, LIST_TAIL); - while(rangelen--) { - listTypeEntry entry; - listTypeNext(iter, &entry); - quicklistEntry *qe = &entry.entry; - if (qe->value) { - addReplyBulkCBuffer(c,qe->value,qe->sz); - } else { - addReplyBulkLongLong(c,qe->longval); - } - } - listTypeReleaseIterator(iter); - } else { - serverPanic("List encoding is not QUICKLIST!"); - } + addListRangeReply(c,o,start,end,0); } +/* LTRIM <key> <start> <stop> */ void ltrimCommand(client *c) { robj *o; long start, end, llen, ltrim, rtrim; @@ -629,6 +691,7 @@ void lposCommand(client *c) { } } +/* LREM <key> <count> <element> */ void lremCommand(client *c) { robj *subject, *obj; obj = c->argv[3]; @@ -756,6 +819,7 @@ void lmoveGenericCommand(client *c, int wherefrom, int whereto) { } } +/* LMOVE <source> <destination> (LEFT|RIGHT) (LEFT|RIGHT) */ void lmoveCommand(client *c) { int wherefrom, whereto; if (getListPositionFromObjectOrReply(c,c->argv[3],&wherefrom) @@ -888,7 +952,7 @@ void blockingPopGenericCommand(client *c, int where) { addReplyBulk(c,c->argv[j]); addReplyBulk(c,value); decrRefCount(value); - listElementsRemoved(c,c->argv[j],where,o); + listElementsRemoved(c,c->argv[j],where,o,1); /* Replicate it as an [LR]POP instead of B[LR]POP. */ rewriteClientCommandVector(c,2, @@ -912,10 +976,12 @@ void blockingPopGenericCommand(client *c, int where) { blockForKeys(c,BLOCKED_LIST,c->argv + 1,c->argc - 2,timeout,NULL,&pos,NULL); } +/* BLPOP <key> [<key> ...] <timeout> */ void blpopCommand(client *c) { blockingPopGenericCommand(c,LIST_HEAD); } +/* BLPOP <key> [<key> ...] <timeout> */ void brpopCommand(client *c) { blockingPopGenericCommand(c,LIST_TAIL); } @@ -942,6 +1008,7 @@ void blmoveGenericCommand(client *c, int wherefrom, int whereto, mstime_t timeou } } +/* BLMOVE <source> <destination> (LEFT|RIGHT) (LEFT|RIGHT) <timeout> */ void blmoveCommand(client *c) { mstime_t timeout; int wherefrom, whereto; @@ -954,6 +1021,7 @@ void blmoveCommand(client *c) { blmoveGenericCommand(c,wherefrom,whereto,timeout); } +/* BRPOPLPUSH <source> <destination> <timeout> */ void brpoplpushCommand(client *c) { mstime_t timeout; if (getTimeoutFromObjectOrReply(c,c->argv[3],&timeout,UNIT_SECONDS) |