diff options
author | antirez <antirez@gmail.com> | 2020-04-08 12:55:57 +0200 |
---|---|---|
committer | antirez <antirez@gmail.com> | 2020-04-08 19:22:56 +0200 |
commit | a5e24eabc306f7fcead0a9d2a5f1433401b6c0ce (patch) | |
tree | 2a6ee17a113212c6bea02a61c3c973abed1b0d57 | |
parent | 1f7d08b76dc83fd2aa43ee7fc04ab4ee01016628 (diff) | |
download | redis-a5e24eabc306f7fcead0a9d2a5f1433401b6c0ce.tar.gz |
Speedup: unblock clients on keys in O(1).o1-bpop-5.0
See #7071.
-rw-r--r-- | src/adlist.c | 20 | ||||
-rw-r--r-- | src/adlist.h | 3 | ||||
-rw-r--r-- | src/blocked.c | 46 | ||||
-rw-r--r-- | src/server.c | 2 |
4 files changed, 50 insertions, 21 deletions
diff --git a/src/adlist.c b/src/adlist.c index ec5f8bbf4..0fedc0729 100644 --- a/src/adlist.c +++ b/src/adlist.c @@ -327,12 +327,11 @@ listNode *listIndex(list *list, long index) { } /* Rotate the list removing the tail node and inserting it to the head. */ -void listRotate(list *list) { - listNode *tail = list->tail; - +void listRotateTailToHead(list *list) { if (listLength(list) <= 1) return; /* Detach current tail */ + listNode *tail = list->tail; list->tail = tail->prev; list->tail->next = NULL; /* Move it as head */ @@ -342,6 +341,21 @@ void listRotate(list *list) { list->head = tail; } +/* Rotate the list removing the head node and inserting it to the tail. */ +void listRotateHeadToTail(list *list) { + if (listLength(list) <= 1) return; + + listNode *head = list->head; + /* Detach current head */ + list->head = head->next; + list->head->prev = NULL; + /* Move it as tail */ + list->tail->next = head; + head->next = NULL; + head->prev = list->tail; + list->tail = head; +} + /* Add all the elements of the list 'o' at the end of the * list 'l'. The list 'other' remains empty but otherwise valid. */ void listJoin(list *l, list *o) { diff --git a/src/adlist.h b/src/adlist.h index c954fac87..2a83cd977 100644 --- a/src/adlist.h +++ b/src/adlist.h @@ -85,7 +85,8 @@ listNode *listSearchKey(list *list, void *key); listNode *listIndex(list *list, long index); void listRewind(list *list, listIter *li); void listRewindTail(list *list, listIter *li); -void listRotate(list *list); +void listRotateTailToHead(list *list); +void listRotateHeadToTail(list *list); void listJoin(list *l, list *o); /* Directions for iterators */ diff --git a/src/blocked.c b/src/blocked.c index b7506173d..d8413b80c 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -67,6 +67,21 @@ int serveClientBlockedOnList(client *receiver, robj *key, robj *dstkey, redisDb *db, robj *value, int where); +/* This structure represents the blocked key information that we store + * in the client structure. Each client blocked on keys, has a + * client->bpop.keys hash table. The keys of the hash table are Redis + * keys pointers to 'robj' structures. The value is this structure. + * The structure has two goals: firstly we store the list node that this + * client uses to be listed in the database "blocked clients for this key" + * list, so we can later unblock in O(1) without a list scan. + * Secondly for certain blocking types, we have additional info. Right now + * the only use for additional info we have is when clients are blocked + * on streams, as we have to remember the ID it blocked for. */ +typedef struct bkinfo { + listNode *listnode; /* List node for db->blocking_keys[key] list. */ + streamID stream_id; /* Stream ID if we blocked in a stream. */ +} bkinfo; + /* Get a timeout value from an object and store it into 'timeout'. * The final timeout is always stored as milliseconds as a time where the * timeout will expire, however the parsing is performed according to @@ -291,8 +306,7 @@ void handleClientsBlockedOnKeys(void) { if (receiver->btype != BLOCKED_LIST) { /* Put at the tail, so that at the next call * we'll not run into it again. */ - listDelNode(clients,clientnode); - listAddNodeTail(clients,receiver); + listRotateHeadToTail(clients); continue; } @@ -353,8 +367,7 @@ void handleClientsBlockedOnKeys(void) { if (receiver->btype != BLOCKED_ZSET) { /* Put at the tail, so that at the next call * we'll not run into it again. */ - listDelNode(clients,clientnode); - listAddNodeTail(clients,receiver); + listRotateHeadToTail(clients); continue; } @@ -398,8 +411,9 @@ void handleClientsBlockedOnKeys(void) { while((ln = listNext(&li))) { client *receiver = listNodeValue(ln); if (receiver->btype != BLOCKED_STREAM) continue; - streamID *gt = dictFetchValue(receiver->bpop.keys, - rl->key); + bkinfo *bki = dictFetchValue(receiver->bpop.keys, + rl->key); + streamID *gt = &bki->stream_id; /* If we blocked in the context of a consumer * group, we need to resolve the group and update the @@ -516,17 +530,15 @@ void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeo if (target != NULL) incrRefCount(target); for (j = 0; j < numkeys; j++) { - /* The value associated with the key name in the bpop.keys dictionary - * is NULL for lists and sorted sets, or the stream ID for streams. */ - void *key_data = NULL; - if (btype == BLOCKED_STREAM) { - key_data = zmalloc(sizeof(streamID)); - memcpy(key_data,ids+j,sizeof(streamID)); - } + /* Allocate our bkinfo structure, associated to each key the client + * is blocked for. */ + bkinfo *bki = zmalloc(sizeof(*bki)); + if (btype == BLOCKED_STREAM) + bki->stream_id = ids[j]; /* If the key already exists in the dictionary ignore it. */ - if (dictAdd(c->bpop.keys,keys[j],key_data) != DICT_OK) { - zfree(key_data); + if (dictAdd(c->bpop.keys,keys[j],bki) != DICT_OK) { + zfree(bki); continue; } incrRefCount(keys[j]); @@ -545,6 +557,7 @@ void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeo l = dictGetVal(de); } listAddNodeTail(l,c); + bki->listnode = listLast(l); } blockClient(c,btype); } @@ -561,11 +574,12 @@ void unblockClientWaitingData(client *c) { /* The client may wait for multiple keys, so unblock it for every key. */ while((de = dictNext(di)) != NULL) { robj *key = dictGetKey(de); + bkinfo *bki = dictGetVal(de); /* Remove this client from the list of clients waiting for this key. */ l = dictFetchValue(c->db->blocking_keys,key); serverAssertWithInfo(c,key,l != NULL); - listDelNode(l,listSearchKey(l,c)); + listDelNode(l,bki->listnode); /* If the list is empty we need to remove it to avoid wasting memory */ if (listLength(l) == 0) dictDelete(c->db->blocking_keys,key); diff --git a/src/server.c b/src/server.c index f6faa61a5..3d3aa7faa 100644 --- a/src/server.c +++ b/src/server.c @@ -987,7 +987,7 @@ void clientsCron(void) { /* Rotate the list, take the current head, process. * This way if the client must be removed from the list it's the * first element and we don't incur into O(N) computation. */ - listRotate(server.clients); + listRotateTailToHead(server.clients); head = listFirst(server.clients); c = listNodeValue(head); /* The following functions do different service checks on the client. |