From 96a54866ab4694cf338af0441f28aa69e9643376 Mon Sep 17 00:00:00 2001 From: antirez Date: Wed, 8 Apr 2020 12:55:57 +0200 Subject: Speedup: unblock clients on keys in O(1). See #7071. --- src/blocked.c | 48 ++++++++++++++++++++++++++++++------------------ 1 file changed, 30 insertions(+), 18 deletions(-) (limited to 'src/blocked.c') diff --git a/src/blocked.c b/src/blocked.c index e3a803ae3..00cc798d5 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -64,6 +64,21 @@ int serveClientBlockedOnList(client *receiver, robj *key, robj *dstkey, redisDb *db, robj *value, int where); +/* This structure represents the blocked key information that we store + * in the client structure. Each client blocked on keys, has a + * client->bpop.keys hash table. The keys of the hash table are Redis + * keys pointers to 'robj' structures. The value is this structure. + * The structure has two goals: firstly we store the list node that this + * client uses to be listed in the database "blocked clients for this key" + * list, so we can later unblock in O(1) without a list scan. + * Secondly for certain blocking types, we have additional info. Right now + * the only use for additional info we have is when clients are blocked + * on streams, as we have to remember the ID it blocked for. */ +typedef struct bkinfo { + listNode *listnode; /* List node for db->blocking_keys[key] list. */ + streamID stream_id; /* Stream ID if we blocked in a stream. */ +} bkinfo; + /* Block a client for the specific operation type. Once the CLIENT_BLOCKED * flag is set client query buffer is not longer processed, but accumulated, * and will be processed when the client is unblocked. */ @@ -211,8 +226,7 @@ void serveClientsBlockedOnListKey(robj *o, readyList *rl) { if (receiver->btype != BLOCKED_LIST) { /* Put at the tail, so that at the next call * we'll not run into it again. */ - listDelNode(clients,clientnode); - listAddNodeTail(clients,receiver); + listRotateHeadToTail(clients); continue; } @@ -273,8 +287,7 @@ void serveClientsBlockedOnSortedSetKey(robj *o, readyList *rl) { if (receiver->btype != BLOCKED_ZSET) { /* Put at the tail, so that at the next call * we'll not run into it again. */ - listDelNode(clients,clientnode); - listAddNodeTail(clients,receiver); + listRotateHeadToTail(clients); continue; } @@ -320,8 +333,8 @@ void serveClientsBlockedOnStreamKey(robj *o, readyList *rl) { while((ln = listNext(&li))) { client *receiver = listNodeValue(ln); if (receiver->btype != BLOCKED_STREAM) continue; - streamID *gt = dictFetchValue(receiver->bpop.keys, - rl->key); + bkinfo *bki = dictFetchValue(receiver->bpop.keys,rl->key); + streamID *gt = &bki->stream_id; /* If we blocked in the context of a consumer * group, we need to resolve the group and update the @@ -419,8 +432,7 @@ void serveClientsBlockedOnKeyByModule(readyList *rl) { * ready to be served, so they'll remain in the list * sometimes. We want also be able to skip clients that are * not blocked for the MODULE type safely. */ - listDelNode(clients,clientnode); - listAddNodeTail(clients,receiver); + listRotateHeadToTail(clients); if (receiver->btype != BLOCKED_MODULE) continue; @@ -551,17 +563,15 @@ void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeo if (target != NULL) incrRefCount(target); for (j = 0; j < numkeys; j++) { - /* The value associated with the key name in the bpop.keys dictionary - * is NULL for lists and sorted sets, or the stream ID for streams. */ - void *key_data = NULL; - if (btype == BLOCKED_STREAM) { - key_data = zmalloc(sizeof(streamID)); - memcpy(key_data,ids+j,sizeof(streamID)); - } + /* Allocate our bkinfo structure, associated to each key the client + * is blocked for. */ + bkinfo *bki = zmalloc(sizeof(*bki)); + if (btype == BLOCKED_STREAM) + bki->stream_id = ids[j]; /* If the key already exists in the dictionary ignore it. */ - if (dictAdd(c->bpop.keys,keys[j],key_data) != DICT_OK) { - zfree(key_data); + if (dictAdd(c->bpop.keys,keys[j],bki) != DICT_OK) { + zfree(bki); continue; } incrRefCount(keys[j]); @@ -580,6 +590,7 @@ void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeo l = dictGetVal(de); } listAddNodeTail(l,c); + bki->listnode = listLast(l); } blockClient(c,btype); } @@ -596,11 +607,12 @@ void unblockClientWaitingData(client *c) { /* The client may wait for multiple keys, so unblock it for every key. */ while((de = dictNext(di)) != NULL) { robj *key = dictGetKey(de); + bkinfo *bki = dictGetVal(de); /* Remove this client from the list of clients waiting for this key. */ l = dictFetchValue(c->db->blocking_keys,key); serverAssertWithInfo(c,key,l != NULL); - listDelNode(l,listSearchKey(l,c)); + listDelNode(l,bki->listnode); /* If the list is empty we need to remove it to avoid wasting memory */ if (listLength(l) == 0) dictDelete(c->db->blocking_keys,key); -- cgit v1.2.1