summaryrefslogtreecommitdiff
path: root/src/blocked.c
diff options
context:
space:
mode:
authorBinbin <binloveplay1314@qq.com>2021-09-09 17:02:33 +0800
committerGitHub <noreply@github.com>2021-09-09 12:02:33 +0300
commitc50af0aeba693bf93e5c471cff08d8ccde0f5213 (patch)
tree5140419b0af05e02c0d88a737d8349d4d2eafdf9 /src/blocked.c
parent216f168b2b144ecf3d082fa9df20970901479d32 (diff)
downloadredis-c50af0aeba693bf93e5c471cff08d8ccde0f5213.tar.gz
Add LMPOP/BLMPOP commands. (#9373)
We want to add COUNT option for BLPOP. But we can't do it without breaking compatibility due to the command arguments syntax. So this commit introduce two new commands. Syntax for the new LMPOP command: `LMPOP numkeys [<key> ...] LEFT|RIGHT [COUNT count]` Syntax for the new BLMPOP command: `BLMPOP timeout numkeys [<key> ...] LEFT|RIGHT [COUNT count]` Some background: - LPOP takes one key, and can return multiple elements. - BLPOP takes multiple keys, but returns one element from just one key. - LMPOP can take multiple keys and return multiple elements from just one key. Note that LMPOP/BLMPOP can take multiple keys, it eventually operates on just one key. And it will propagate as LPOP or RPOP with the COUNT option. As a new command, it still return NIL if we can't pop any elements. For the normal response is nested arrays in RESP2 and RESP3, like: ``` LMPOP/BLMPOP 1) keyname 2) 1) element1 2) element2 ``` I.e. unlike BLPOP that returns a key name and one element so it uses a flat array, and LPOP that returns multiple elements with no key name, and again uses a flat array, this one has to return a nested array, and it does for for both RESP2 and RESP3 (like SCAN does) Some discuss can see: #766 #8824
Diffstat (limited to 'src/blocked.c')
-rw-r--r--src/blocked.c57
1 files changed, 24 insertions, 33 deletions
diff --git a/src/blocked.c b/src/blocked.c
index 6ce4b9893..68d3112a9 100644
--- a/src/blocked.c
+++ b/src/blocked.c
@@ -65,7 +65,7 @@
#include "latency.h"
#include "monotonic.h"
-int serveClientBlockedOnList(client *receiver, robj *key, robj *dstkey, redisDb *db, robj *value, int wherefrom, int whereto);
+void serveClientBlockedOnList(client *receiver, robj *o, robj *key, robj *dstkey, redisDb *db, int wherefrom, int whereto, int *deleted);
int getListPositionFromObjectOrReply(client *c, robj *arg, int *position);
/* This structure represents the blocked key information that we store
@@ -271,6 +271,7 @@ void serveClientsBlockedOnListKey(robj *o, readyList *rl) {
if (de) {
list *clients = dictGetVal(de);
int numclients = listLength(clients);
+ int deleted = 0;
while(numclients--) {
listNode *clientnode = listFirst(clients);
@@ -286,41 +287,27 @@ void serveClientsBlockedOnListKey(robj *o, readyList *rl) {
robj *dstkey = receiver->bpop.target;
int wherefrom = receiver->bpop.listpos.wherefrom;
int whereto = receiver->bpop.listpos.whereto;
- robj *value = listTypePop(o, wherefrom);
- if (value) {
- /* Protect receiver->bpop.target, that will be
- * freed by the next unblockClient()
- * call. */
- if (dstkey) incrRefCount(dstkey);
+ /* Protect receiver->bpop.target, that will be
+ * freed by the next unblockClient()
+ * call. */
+ if (dstkey) incrRefCount(dstkey);
- monotime replyTimer;
- elapsedStart(&replyTimer);
- if (serveClientBlockedOnList(receiver,
- rl->key,dstkey,rl->db,value,
- wherefrom, whereto) == C_ERR)
- {
- /* If we failed serving the client we need
- * to also undo the POP operation. */
- listTypePush(o,value,wherefrom);
- }
- updateStatsOnUnblock(receiver, 0, elapsedUs(replyTimer));
- unblockClient(receiver);
+ monotime replyTimer;
+ elapsedStart(&replyTimer);
+ serveClientBlockedOnList(receiver, o,
+ rl->key, dstkey, rl->db,
+ wherefrom, whereto,
+ &deleted);
+ updateStatsOnUnblock(receiver, 0, elapsedUs(replyTimer));
+ unblockClient(receiver);
- if (dstkey) decrRefCount(dstkey);
- decrRefCount(value);
- } else {
- break;
- }
- }
- }
+ if (dstkey) decrRefCount(dstkey);
- if (listTypeLength(o) == 0) {
- dbDelete(rl->db,rl->key);
- notifyKeyspaceEvent(NOTIFY_GENERIC,"del",rl->key,rl->db->id);
+ /* The list is empty and has been deleted. */
+ if (deleted) break;
+ }
}
- /* We don't call signalModifiedKey() as it was already called
- * when an element was pushed on the list. */
}
/* Helper function for handleClientsBlockedOnKeys(). This function is called
@@ -627,12 +614,16 @@ void handleClientsBlockedOnKeys(void) {
* for all the 'numkeys' keys as in the 'keys' argument. When we block for
* stream keys, we also provide an array of streamID structures: clients will
* be unblocked only when items with an ID greater or equal to the specified
- * one is appended to the stream. */
-void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeout, robj *target, struct listPos *listpos, streamID *ids) {
+ * one is appended to the stream.
+ *
+ * 'count' for those commands that support the optional count argument.
+ * Otherwise the value is 0. */
+void blockForKeys(client *c, int btype, robj **keys, int numkeys, long count, mstime_t timeout, robj *target, struct listPos *listpos, streamID *ids) {
dictEntry *de;
list *l;
int j;
+ c->bpop.count = count;
c->bpop.timeout = timeout;
c->bpop.target = target;