summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorantirez <antirez@gmail.com>2020-04-08 12:55:57 +0200
committerantirez <antirez@gmail.com>2020-04-08 19:22:56 +0200
commita5e24eabc306f7fcead0a9d2a5f1433401b6c0ce (patch)
tree2a6ee17a113212c6bea02a61c3c973abed1b0d57
parent1f7d08b76dc83fd2aa43ee7fc04ab4ee01016628 (diff)
downloadredis-o1-bpop-5.0.tar.gz
Speedup: unblock clients on keys in O(1).o1-bpop-5.0
See #7071.
-rw-r--r--src/adlist.c20
-rw-r--r--src/adlist.h3
-rw-r--r--src/blocked.c46
-rw-r--r--src/server.c2
4 files changed, 50 insertions, 21 deletions
diff --git a/src/adlist.c b/src/adlist.c
index ec5f8bbf4..0fedc0729 100644
--- a/src/adlist.c
+++ b/src/adlist.c
@@ -327,12 +327,11 @@ listNode *listIndex(list *list, long index) {
}
/* Rotate the list removing the tail node and inserting it to the head. */
-void listRotate(list *list) {
- listNode *tail = list->tail;
-
+void listRotateTailToHead(list *list) {
if (listLength(list) <= 1) return;
/* Detach current tail */
+ listNode *tail = list->tail;
list->tail = tail->prev;
list->tail->next = NULL;
/* Move it as head */
@@ -342,6 +341,21 @@ void listRotate(list *list) {
list->head = tail;
}
+/* Rotate the list removing the head node and inserting it to the tail. */
+void listRotateHeadToTail(list *list) {
+ if (listLength(list) <= 1) return;
+
+ listNode *head = list->head;
+ /* Detach current head */
+ list->head = head->next;
+ list->head->prev = NULL;
+ /* Move it as tail */
+ list->tail->next = head;
+ head->next = NULL;
+ head->prev = list->tail;
+ list->tail = head;
+}
+
/* Add all the elements of the list 'o' at the end of the
* list 'l'. The list 'other' remains empty but otherwise valid. */
void listJoin(list *l, list *o) {
diff --git a/src/adlist.h b/src/adlist.h
index c954fac87..2a83cd977 100644
--- a/src/adlist.h
+++ b/src/adlist.h
@@ -85,7 +85,8 @@ listNode *listSearchKey(list *list, void *key);
listNode *listIndex(list *list, long index);
void listRewind(list *list, listIter *li);
void listRewindTail(list *list, listIter *li);
-void listRotate(list *list);
+void listRotateTailToHead(list *list);
+void listRotateHeadToTail(list *list);
void listJoin(list *l, list *o);
/* Directions for iterators */
diff --git a/src/blocked.c b/src/blocked.c
index b7506173d..d8413b80c 100644
--- a/src/blocked.c
+++ b/src/blocked.c
@@ -67,6 +67,21 @@
int serveClientBlockedOnList(client *receiver, robj *key, robj *dstkey, redisDb *db, robj *value, int where);
+/* This structure represents the blocked key information that we store
+ * in the client structure. Each client blocked on keys, has a
+ * client->bpop.keys hash table. The keys of the hash table are Redis
+ * keys pointers to 'robj' structures. The value is this structure.
+ * The structure has two goals: firstly we store the list node that this
+ * client uses to be listed in the database "blocked clients for this key"
+ * list, so we can later unblock in O(1) without a list scan.
+ * Secondly for certain blocking types, we have additional info. Right now
+ * the only use for additional info we have is when clients are blocked
+ * on streams, as we have to remember the ID it blocked for. */
+typedef struct bkinfo {
+ listNode *listnode; /* List node for db->blocking_keys[key] list. */
+ streamID stream_id; /* Stream ID if we blocked in a stream. */
+} bkinfo;
+
/* Get a timeout value from an object and store it into 'timeout'.
* The final timeout is always stored as milliseconds as a time where the
* timeout will expire, however the parsing is performed according to
@@ -291,8 +306,7 @@ void handleClientsBlockedOnKeys(void) {
if (receiver->btype != BLOCKED_LIST) {
/* Put at the tail, so that at the next call
* we'll not run into it again. */
- listDelNode(clients,clientnode);
- listAddNodeTail(clients,receiver);
+ listRotateHeadToTail(clients);
continue;
}
@@ -353,8 +367,7 @@ void handleClientsBlockedOnKeys(void) {
if (receiver->btype != BLOCKED_ZSET) {
/* Put at the tail, so that at the next call
* we'll not run into it again. */
- listDelNode(clients,clientnode);
- listAddNodeTail(clients,receiver);
+ listRotateHeadToTail(clients);
continue;
}
@@ -398,8 +411,9 @@ void handleClientsBlockedOnKeys(void) {
while((ln = listNext(&li))) {
client *receiver = listNodeValue(ln);
if (receiver->btype != BLOCKED_STREAM) continue;
- streamID *gt = dictFetchValue(receiver->bpop.keys,
- rl->key);
+ bkinfo *bki = dictFetchValue(receiver->bpop.keys,
+ rl->key);
+ streamID *gt = &bki->stream_id;
/* If we blocked in the context of a consumer
* group, we need to resolve the group and update the
@@ -516,17 +530,15 @@ void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeo
if (target != NULL) incrRefCount(target);
for (j = 0; j < numkeys; j++) {
- /* The value associated with the key name in the bpop.keys dictionary
- * is NULL for lists and sorted sets, or the stream ID for streams. */
- void *key_data = NULL;
- if (btype == BLOCKED_STREAM) {
- key_data = zmalloc(sizeof(streamID));
- memcpy(key_data,ids+j,sizeof(streamID));
- }
+ /* Allocate our bkinfo structure, associated to each key the client
+ * is blocked for. */
+ bkinfo *bki = zmalloc(sizeof(*bki));
+ if (btype == BLOCKED_STREAM)
+ bki->stream_id = ids[j];
/* If the key already exists in the dictionary ignore it. */
- if (dictAdd(c->bpop.keys,keys[j],key_data) != DICT_OK) {
- zfree(key_data);
+ if (dictAdd(c->bpop.keys,keys[j],bki) != DICT_OK) {
+ zfree(bki);
continue;
}
incrRefCount(keys[j]);
@@ -545,6 +557,7 @@ void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeo
l = dictGetVal(de);
}
listAddNodeTail(l,c);
+ bki->listnode = listLast(l);
}
blockClient(c,btype);
}
@@ -561,11 +574,12 @@ void unblockClientWaitingData(client *c) {
/* The client may wait for multiple keys, so unblock it for every key. */
while((de = dictNext(di)) != NULL) {
robj *key = dictGetKey(de);
+ bkinfo *bki = dictGetVal(de);
/* Remove this client from the list of clients waiting for this key. */
l = dictFetchValue(c->db->blocking_keys,key);
serverAssertWithInfo(c,key,l != NULL);
- listDelNode(l,listSearchKey(l,c));
+ listDelNode(l,bki->listnode);
/* If the list is empty we need to remove it to avoid wasting memory */
if (listLength(l) == 0)
dictDelete(c->db->blocking_keys,key);
diff --git a/src/server.c b/src/server.c
index f6faa61a5..3d3aa7faa 100644
--- a/src/server.c
+++ b/src/server.c
@@ -987,7 +987,7 @@ void clientsCron(void) {
/* Rotate the list, take the current head, process.
* This way if the client must be removed from the list it's the
* first element and we don't incur into O(N) computation. */
- listRotate(server.clients);
+ listRotateTailToHead(server.clients);
head = listFirst(server.clients);
c = listNodeValue(head);
/* The following functions do different service checks on the client.