summaryrefslogtreecommitdiff
path: root/src/t_list.c
diff options
context:
space:
mode:
authorItamar Haber <itamar@redislabs.com>2020-12-25 21:49:24 +0200
committerGitHub <noreply@github.com>2020-12-25 21:49:24 +0200
commitf44186e5755e943f39c1446ad9576ab97de5039d (patch)
tree15ebfaba6b54a41657c10c7c1b147465d6dddfb1 /src/t_list.c
parente18068d9d83e6c7298b5501ffabfdb7709984fc0 (diff)
downloadredis-f44186e5755e943f39c1446ad9576ab97de5039d.tar.gz
Adds count to L/RPOP (#8179)
Adds: `L/RPOP <key> [count]` Implements no. 2 of the following strategies: 1. Loop on listTypePop - this would result in multiple calls for memory freeing and allocating (see https://github.com/redis/redis/pull/8179/commits/769167a079b0e110d28e4a8099dce1ecd45682b5) 2. Iterate the range to build the reply, then call quickListDelRange - this requires two iterations and **is the current choice** 3. Refactor quicklist to have a pop variant of quickListDelRange - probably optimal but more complex Also: * There's a historical check for NULL after calling listTypePop that was converted to an assert. * This refactors common logic shared between LRANGE and the new form of LPOP/RPOP into addListRangeReply (adds test for b/w compat) * Consequently, it may have made sense to have `LRANGE l -1 -2` and `LRANGE l 9 0` be legit and return a reverse reply. Due to historical reasons that would be, however, a breaking change. * Added minimal comments to existing commands to adhere to the style, make core dev life easier and get commit karma, naturally.
Diffstat (limited to 'src/t_list.c')
-rw-r--r--src/t_list.c156
1 files changed, 112 insertions, 44 deletions
diff --git a/src/t_list.c b/src/t_list.c
index c8323c612..b64b37a8a 100644
--- a/src/t_list.c
+++ b/src/t_list.c
@@ -206,7 +206,7 @@ robj *listTypeDup(robj *o) {
lobj->encoding = OBJ_ENCODING_QUICKLIST;
break;
default:
- serverPanic("Wrong encoding.");
+ serverPanic("Unknown list encoding");
break;
}
return lobj;
@@ -216,6 +216,7 @@ robj *listTypeDup(robj *o) {
* List Commands
*----------------------------------------------------------------------------*/
+/* Implements LPUSH/RPUSH. */
void pushGenericCommand(client *c, int where) {
int j, pushed = 0;
robj *lobj = lookupKeyWrite(c->db,c->argv[1]);
@@ -244,14 +245,17 @@ void pushGenericCommand(client *c, int where) {
server.dirty += pushed;
}
+/* LPUSH <key> <element> [<element> ...] */
void lpushCommand(client *c) {
pushGenericCommand(c,LIST_HEAD);
}
+/* RPUSH <key> <element> [<element> ...] */
void rpushCommand(client *c) {
pushGenericCommand(c,LIST_TAIL);
}
+/* Implements LPUSHX/RPUSHX. */
void pushxGenericCommand(client *c, int where) {
int j, pushed = 0;
robj *subject;
@@ -274,14 +278,17 @@ void pushxGenericCommand(client *c, int where) {
server.dirty += pushed;
}
+/* LPUSHX <key> <element> [<element> ...] */
void lpushxCommand(client *c) {
pushxGenericCommand(c,LIST_HEAD);
}
+/* RPUSH <key> <element> [<element> ...] */
void rpushxCommand(client *c) {
pushxGenericCommand(c,LIST_TAIL);
}
+/* LINSERT <key> (BEFORE|AFTER) <pivot> <element> */
void linsertCommand(client *c) {
int where;
robj *subject;
@@ -326,12 +333,14 @@ void linsertCommand(client *c) {
addReplyLongLong(c,listTypeLength(subject));
}
+/* LLEN <key> */
void llenCommand(client *c) {
robj *o = lookupKeyReadOrReply(c,c->argv[1],shared.czero);
if (o == NULL || checkType(c,o,OBJ_LIST)) return;
addReplyLongLong(c,listTypeLength(o));
}
+/* LINDEX <key> <index> */
void lindexCommand(client *c) {
robj *o = lookupKeyReadOrReply(c,c->argv[1],shared.null[c->resp]);
if (o == NULL || checkType(c,o,OBJ_LIST)) return;
@@ -359,6 +368,7 @@ void lindexCommand(client *c) {
}
}
+/* LSET <key> <index> <element> */
void lsetCommand(client *c) {
robj *o = lookupKeyWriteOrReply(c,c->argv[1],shared.nokeyerr);
if (o == NULL || checkType(c,o,OBJ_LIST)) return;
@@ -385,7 +395,53 @@ void lsetCommand(client *c) {
}
}
-void listElementsRemoved(client *c, robj *key, int where, robj *o) {
+/* A helper for replying with a list's range between the inclusive start and end
+ * indexes as multi-bulk, with support for negative indexes. Note that start
+ * must be less than end or an empty array is returned. When the reverse
+ * argument is set to a non-zero value, the reply is reversed so that elements
+ * are returned from end to start. */
+void addListRangeReply(client *c, robj *o, long start, long end, int reverse) {
+ long rangelen, llen = listTypeLength(o);
+
+ /* Convert negative indexes. */
+ if (start < 0) start = llen+start;
+ if (end < 0) end = llen+end;
+ if (start < 0) start = 0;
+
+ /* Invariant: start >= 0, so this test will be true when end < 0.
+ * The range is empty when start > end or start >= length. */
+ if (start > end || start >= llen) {
+ addReply(c,shared.emptyarray);
+ return;
+ }
+ if (end >= llen) end = llen-1;
+ rangelen = (end-start)+1;
+
+ /* Return the result in form of a multi-bulk reply */
+ addReplyArrayLen(c,rangelen);
+ if (o->encoding == OBJ_ENCODING_QUICKLIST) {
+ int from = reverse ? end : start;
+ int direction = reverse ? LIST_HEAD : LIST_TAIL;
+ listTypeIterator *iter = listTypeInitIterator(o,from,direction);
+
+ while(rangelen--) {
+ listTypeEntry entry;
+ listTypeNext(iter, &entry);
+ quicklistEntry *qe = &entry.entry;
+ if (qe->value) {
+ addReplyBulkCBuffer(c,qe->value,qe->sz);
+ } else {
+ addReplyBulkLongLong(c,qe->longval);
+ }
+ }
+ listTypeReleaseIterator(iter);
+ } else {
+ serverPanic("Unknown list encoding");
+ }
+}
+
+/* A housekeeping helper for list elements popping tasks. */
+void listElementsRemoved(client *c, robj *key, int where, robj *o, long count) {
char *event = (where == LIST_HEAD) ? "lpop" : "rpop";
notifyKeyspaceEvent(NOTIFY_LIST, event, key, c->db->id);
@@ -394,78 +450,84 @@ void listElementsRemoved(client *c, robj *key, int where, robj *o) {
dbDelete(c->db, key);
}
signalModifiedKey(c, c->db, key);
- server.dirty++;
+ server.dirty += count;
}
+/* Implements the generic list pop operation for LPOP/RPOP.
+ * The where argument specifies which end of the list is operated on. An
+ * optional count may be provided as the third argument of the client's
+ * command. */
void popGenericCommand(client *c, int where) {
+ long count = 0;
+ robj *value;
+
+ if (c->argc > 3) {
+ addReplyErrorFormat(c,"wrong number of arguments for '%s' command",
+ c->cmd->name);
+ return;
+ } else if (c->argc == 3) {
+ /* Parse the optional count argument. */
+ if (getPositiveLongFromObjectOrReply(c,c->argv[2],&count,NULL) != C_OK)
+ return;
+ if (count == 0) {
+ /* Fast exit path. */
+ addReplyNullArray(c);
+ return;
+ }
+ }
+
robj *o = lookupKeyWriteOrReply(c, c->argv[1], shared.null[c->resp]);
if (o == NULL || checkType(c, o, OBJ_LIST))
return;
- robj *value = listTypePop(o, where);
- if (value == NULL) {
- addReplyNull(c);
- } else {
+ if (!count) {
+ /* Pop a single element. This is POP's original behavior that replies
+ * with a bulk string. */
+ value = listTypePop(o,where);
+ serverAssert(value != NULL);
addReplyBulk(c,value);
decrRefCount(value);
- listElementsRemoved(c,c->argv[1],where,o);
+ listElementsRemoved(c,c->argv[1],where,o,1);
+ } else {
+ /* Pop a range of elements. An addition to the original POP command,
+ * which replies with a multi-bulk. */
+ long llen = listTypeLength(o);
+ long rangelen = (count > llen) ? llen : count;
+ long rangestart = (where == LIST_HEAD) ? 0 : -rangelen;
+ long rangeend = (where == LIST_HEAD) ? rangelen - 1 : -1;
+ int reverse = (where == LIST_HEAD) ? 0 : 1;
+
+ addListRangeReply(c,o,rangestart,rangeend,reverse);
+ quicklistDelRange(o->ptr,rangestart,rangelen);
+ listElementsRemoved(c,c->argv[1],where,o,rangelen);
}
}
+/* LPOP <key> [count] */
void lpopCommand(client *c) {
popGenericCommand(c,LIST_HEAD);
}
+/* RPOP <key> [count] */
void rpopCommand(client *c) {
popGenericCommand(c,LIST_TAIL);
}
+/* LRANGE <key> <start> <stop> */
void lrangeCommand(client *c) {
robj *o;
- long start, end, llen, rangelen;
+ long start, end;
if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != C_OK) ||
(getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != C_OK)) return;
if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptyarray)) == NULL
|| checkType(c,o,OBJ_LIST)) return;
- llen = listTypeLength(o);
-
- /* convert negative indexes */
- if (start < 0) start = llen+start;
- if (end < 0) end = llen+end;
- if (start < 0) start = 0;
-
- /* Invariant: start >= 0, so this test will be true when end < 0.
- * The range is empty when start > end or start >= length. */
- if (start > end || start >= llen) {
- addReply(c,shared.emptyarray);
- return;
- }
- if (end >= llen) end = llen-1;
- rangelen = (end-start)+1;
-
- /* Return the result in form of a multi-bulk reply */
- addReplyArrayLen(c,rangelen);
- if (o->encoding == OBJ_ENCODING_QUICKLIST) {
- listTypeIterator *iter = listTypeInitIterator(o, start, LIST_TAIL);
- while(rangelen--) {
- listTypeEntry entry;
- listTypeNext(iter, &entry);
- quicklistEntry *qe = &entry.entry;
- if (qe->value) {
- addReplyBulkCBuffer(c,qe->value,qe->sz);
- } else {
- addReplyBulkLongLong(c,qe->longval);
- }
- }
- listTypeReleaseIterator(iter);
- } else {
- serverPanic("List encoding is not QUICKLIST!");
- }
+ addListRangeReply(c,o,start,end,0);
}
+/* LTRIM <key> <start> <stop> */
void ltrimCommand(client *c) {
robj *o;
long start, end, llen, ltrim, rtrim;
@@ -629,6 +691,7 @@ void lposCommand(client *c) {
}
}
+/* LREM <key> <count> <element> */
void lremCommand(client *c) {
robj *subject, *obj;
obj = c->argv[3];
@@ -756,6 +819,7 @@ void lmoveGenericCommand(client *c, int wherefrom, int whereto) {
}
}
+/* LMOVE <source> <destination> (LEFT|RIGHT) (LEFT|RIGHT) */
void lmoveCommand(client *c) {
int wherefrom, whereto;
if (getListPositionFromObjectOrReply(c,c->argv[3],&wherefrom)
@@ -888,7 +952,7 @@ void blockingPopGenericCommand(client *c, int where) {
addReplyBulk(c,c->argv[j]);
addReplyBulk(c,value);
decrRefCount(value);
- listElementsRemoved(c,c->argv[j],where,o);
+ listElementsRemoved(c,c->argv[j],where,o,1);
/* Replicate it as an [LR]POP instead of B[LR]POP. */
rewriteClientCommandVector(c,2,
@@ -912,10 +976,12 @@ void blockingPopGenericCommand(client *c, int where) {
blockForKeys(c,BLOCKED_LIST,c->argv + 1,c->argc - 2,timeout,NULL,&pos,NULL);
}
+/* BLPOP <key> [<key> ...] <timeout> */
void blpopCommand(client *c) {
blockingPopGenericCommand(c,LIST_HEAD);
}
+/* BLPOP <key> [<key> ...] <timeout> */
void brpopCommand(client *c) {
blockingPopGenericCommand(c,LIST_TAIL);
}
@@ -942,6 +1008,7 @@ void blmoveGenericCommand(client *c, int wherefrom, int whereto, mstime_t timeou
}
}
+/* BLMOVE <source> <destination> (LEFT|RIGHT) (LEFT|RIGHT) <timeout> */
void blmoveCommand(client *c) {
mstime_t timeout;
int wherefrom, whereto;
@@ -954,6 +1021,7 @@ void blmoveCommand(client *c) {
blmoveGenericCommand(c,wherefrom,whereto,timeout);
}
+/* BRPOPLPUSH <source> <destination> <timeout> */
void brpoplpushCommand(client *c) {
mstime_t timeout;
if (getTimeoutFromObjectOrReply(c,c->argv[3],&timeout,UNIT_SECONDS)