summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
author杨博东 <yangbodong22011@gmail.com>2020-08-11 13:18:09 +0800
committerGitHub <noreply@github.com>2020-08-11 08:18:09 +0300
commit229327ad8be5cd87ead56884228dfb08b6017c45 (patch)
treed7887540494ca2b76e9a0e97043b2196817ec296
parentefe92ee5462f65b6a4dabb3d578a31bedcad2f33 (diff)
downloadredis-229327ad8be5cd87ead56884228dfb08b6017c45.tar.gz
Avoid redundant calls to signalKeyAsReady (#7625)
signalKeyAsReady has some overhead (namely dictFind) so we should only call it when there are clients blocked on the relevant type (BLOCKED_*)
-rw-r--r--src/blocked.c18
-rw-r--r--src/db.c10
-rw-r--r--src/module.c2
-rw-r--r--src/server.h2
-rw-r--r--src/t_stream.c5
5 files changed, 22 insertions, 15 deletions
diff --git a/src/blocked.c b/src/blocked.c
index 92f1cee65..c3884fbdf 100644
--- a/src/blocked.c
+++ b/src/blocked.c
@@ -634,6 +634,16 @@ void unblockClientWaitingData(client *c) {
}
}
+static int getBlockedTypeByType(int type) {
+ switch (type) {
+ case OBJ_LIST: return BLOCKED_LIST;
+ case OBJ_ZSET: return BLOCKED_ZSET;
+ case OBJ_MODULE: return BLOCKED_MODULE;
+ case OBJ_STREAM: return BLOCKED_STREAM;
+ default: return BLOCKED_NONE;
+ }
+}
+
/* If the specified key has clients blocked waiting for list pushes, this
* function will put the key reference into the server.ready_keys list.
* Note that db->ready_keys is a hash table that allows us to avoid putting
@@ -641,9 +651,14 @@ void unblockClientWaitingData(client *c) {
* made by a script or in the context of MULTI/EXEC.
*
* The list will be finally processed by handleClientsBlockedOnKeys() */
-void signalKeyAsReady(redisDb *db, robj *key) {
+void signalKeyAsReady(redisDb *db, robj *key, int type) {
readyList *rl;
+ /* If no clients are blocked on this type, just return */
+ int btype = getBlockedTypeByType(type);
+ if (btype == BLOCKED_NONE || !server.blocked_clients_by_type[btype])
+ return;
+
/* No clients blocking for this key? No need to queue it. */
if (dictFind(db->blocking_keys,key) == NULL) return;
@@ -664,4 +679,3 @@ void signalKeyAsReady(redisDb *db, robj *key) {
serverAssert(dictAdd(db->ready_keys,key,NULL) == DICT_OK);
}
-
diff --git a/src/db.c b/src/db.c
index 529b57c7b..70de568fb 100644
--- a/src/db.c
+++ b/src/db.c
@@ -181,10 +181,7 @@ void dbAdd(redisDb *db, robj *key, robj *val) {
int retval = dictAdd(db->dict, copy, val);
serverAssertWithInfo(NULL,key,retval == DICT_OK);
- if (val->type == OBJ_LIST ||
- val->type == OBJ_ZSET ||
- val->type == OBJ_STREAM)
- signalKeyAsReady(db, key);
+ signalKeyAsReady(db, key, val->type);
if (server.cluster_enabled) slotToKeyAdd(key->ptr);
}
@@ -1089,10 +1086,7 @@ void scanDatabaseForReadyLists(redisDb *db) {
while((de = dictNext(di)) != NULL) {
robj *key = dictGetKey(de);
robj *value = lookupKey(db,key,LOOKUP_NOTOUCH);
- if (value && (value->type == OBJ_LIST ||
- value->type == OBJ_STREAM ||
- value->type == OBJ_ZSET))
- signalKeyAsReady(db, key);
+ if (value) signalKeyAsReady(db, key, value->type);
}
dictReleaseIterator(di);
}
diff --git a/src/module.c b/src/module.c
index 8e9526dad..0714458f7 100644
--- a/src/module.c
+++ b/src/module.c
@@ -4579,7 +4579,7 @@ RedisModuleBlockedClient *RM_BlockClientOnKeys(RedisModuleCtx *ctx, RedisModuleC
* all the clients blocked for this key will get their reply callback called,
* and if the callback returns REDISMODULE_OK the client will be unblocked. */
void RM_SignalKeyAsReady(RedisModuleCtx *ctx, RedisModuleString *key) {
- signalKeyAsReady(ctx->client->db, key);
+ signalKeyAsReady(ctx->client->db, key, OBJ_MODULE);
}
/* Implements RM_UnblockClient() and moduleUnblockClient(). */
diff --git a/src/server.h b/src/server.h
index 19aab8750..dbfa5aead 100644
--- a/src/server.h
+++ b/src/server.h
@@ -2189,7 +2189,7 @@ void replyToBlockedClientTimedOut(client *c);
int getTimeoutFromObjectOrReply(client *c, robj *object, mstime_t *timeout, int unit);
void disconnectAllBlockedClients(void);
void handleClientsBlockedOnKeys(void);
-void signalKeyAsReady(redisDb *db, robj *key);
+void signalKeyAsReady(redisDb *db, robj *key, int type);
void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeout, robj *target, streamID *ids);
/* timeout.c -- Blocked clients timeout and connections timeout. */
diff --git a/src/t_stream.c b/src/t_stream.c
index 38bdce8e0..fc5c4c99e 100644
--- a/src/t_stream.c
+++ b/src/t_stream.c
@@ -1299,8 +1299,7 @@ void xaddCommand(client *c) {
/* We need to signal to blocked clients that there is new data on this
* stream. */
- if (server.blocked_clients_by_type[BLOCKED_STREAM])
- signalKeyAsReady(c->db, c->argv[1]);
+ signalKeyAsReady(c->db, c->argv[1], OBJ_STREAM);
}
/* XRANGE/XREVRANGE actual implementation. */
@@ -1876,7 +1875,7 @@ NULL
notifyKeyspaceEvent(NOTIFY_STREAM,"xgroup-destroy",
c->argv[2],c->db->id);
/* We want to unblock any XREADGROUP consumers with -NOGROUP. */
- signalKeyAsReady(c->db,c->argv[2]);
+ signalKeyAsReady(c->db,c->argv[2],OBJ_STREAM);
} else {
addReply(c,shared.czero);
}