diff options
Diffstat (limited to 'src/blocked.c')
-rw-r--r-- | src/blocked.c | 29 |
1 files changed, 21 insertions, 8 deletions
diff --git a/src/blocked.c b/src/blocked.c index 09e17213c..eb110bd35 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -106,12 +106,11 @@ void blockClient(client *c, int btype) { void updateStatsOnUnblock(client *c, long blocked_us, long reply_us){ const ustime_t total_cmd_duration = c->duration + blocked_us + reply_us; c->lastcmd->microseconds += total_cmd_duration; + /* Log the command into the Slow log if needed. */ - if (!(c->lastcmd->flags & CMD_SKIP_SLOWLOG)) { - slowlogPushEntryIfNeeded(c,c->argv,c->argc,total_cmd_duration); - /* Log the reply duration event. */ - latencyAddSampleIfNeeded("command-unblocking",reply_us/1000); - } + slowlogPushCurrentCommand(c, c->lastcmd, total_cmd_duration); + /* Log the reply duration event. */ + latencyAddSampleIfNeeded("command-unblocking",reply_us/1000); } /* This function is called in the beforeSleep() function of the event loop @@ -188,6 +187,16 @@ void unblockClient(client *c) { } else { serverPanic("Unknown btype in unblockClient()."); } + + /* Reset the client for a new query since, for blocking commands + * we do not do it immediately after the command returns (when the + * client got blocked) in order to be still able to access the argument + * vector from module callbacks and updateStatsOnUnblock. */ + if (c->btype != BLOCKED_PAUSE) { + freeClientOriginalArgv(c); + resetClient(c); + } + /* Clear the flags, and put the client in the unblocked list so that * we'll process new commands in its query buffer ASAP. */ server.blocked_clients--; @@ -279,7 +288,6 @@ void serveClientsBlockedOnListKey(robj *o, readyList *rl) { * freed by the next unblockClient() * call. */ if (dstkey) incrRefCount(dstkey); - unblockClient(receiver); monotime replyTimer; elapsedStart(&replyTimer); @@ -292,6 +300,7 @@ void serveClientsBlockedOnListKey(robj *o, readyList *rl) { listTypePush(o,value,wherefrom); } updateStatsOnUnblock(receiver, 0, elapsedUs(replyTimer)); + unblockClient(receiver); if (dstkey) decrRefCount(dstkey); decrRefCount(value); @@ -335,11 +344,11 @@ void serveClientsBlockedOnSortedSetKey(robj *o, readyList *rl) { int where = (receiver->lastcmd && receiver->lastcmd->proc == bzpopminCommand) ? ZSET_MIN : ZSET_MAX; - unblockClient(receiver); monotime replyTimer; elapsedStart(&replyTimer); genericZpopCommand(receiver,&rl->key,1,where,1,NULL); updateStatsOnUnblock(receiver, 0, elapsedUs(replyTimer)); + unblockClient(receiver); zcard--; /* Replicate the command. */ @@ -471,6 +480,10 @@ void serveClientsBlockedOnStreamKey(robj *o, readyList *rl) { void serveClientsBlockedOnKeyByModule(readyList *rl) { dictEntry *de; + /* Optimization: If no clients are in type BLOCKED_MODULE, + * we can skip this loop. */ + if (!server.blocked_clients_by_type[BLOCKED_MODULE]) return; + /* We serve clients in the same order they blocked for * this key, from the first blocked to the last. */ de = dictFind(rl->db->blocking_keys,rl->key); @@ -553,7 +566,7 @@ void handleClientsBlockedOnKeys(void) { * way we can lookup an object multiple times (BLMOVE does * that) without the risk of it being freed in the second * lookup, invalidating the first one. - * See https://github.com/antirez/redis/pull/6554. */ + * See https://github.com/redis/redis/pull/6554. */ server.fixed_time_expire++; updateCachedTime(0); |