summaryrefslogtreecommitdiff
path: root/src/blocked.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/blocked.c')
-rw-r--r--src/blocked.c29
1 files changed, 21 insertions, 8 deletions
diff --git a/src/blocked.c b/src/blocked.c
index 09e17213c..eb110bd35 100644
--- a/src/blocked.c
+++ b/src/blocked.c
@@ -106,12 +106,11 @@ void blockClient(client *c, int btype) {
void updateStatsOnUnblock(client *c, long blocked_us, long reply_us){
const ustime_t total_cmd_duration = c->duration + blocked_us + reply_us;
c->lastcmd->microseconds += total_cmd_duration;
+
/* Log the command into the Slow log if needed. */
- if (!(c->lastcmd->flags & CMD_SKIP_SLOWLOG)) {
- slowlogPushEntryIfNeeded(c,c->argv,c->argc,total_cmd_duration);
- /* Log the reply duration event. */
- latencyAddSampleIfNeeded("command-unblocking",reply_us/1000);
- }
+ slowlogPushCurrentCommand(c, c->lastcmd, total_cmd_duration);
+ /* Log the reply duration event. */
+ latencyAddSampleIfNeeded("command-unblocking",reply_us/1000);
}
/* This function is called in the beforeSleep() function of the event loop
@@ -188,6 +187,16 @@ void unblockClient(client *c) {
} else {
serverPanic("Unknown btype in unblockClient().");
}
+
+ /* Reset the client for a new query since, for blocking commands
+ * we do not do it immediately after the command returns (when the
+ * client got blocked) in order to be still able to access the argument
+ * vector from module callbacks and updateStatsOnUnblock. */
+ if (c->btype != BLOCKED_PAUSE) {
+ freeClientOriginalArgv(c);
+ resetClient(c);
+ }
+
/* Clear the flags, and put the client in the unblocked list so that
* we'll process new commands in its query buffer ASAP. */
server.blocked_clients--;
@@ -279,7 +288,6 @@ void serveClientsBlockedOnListKey(robj *o, readyList *rl) {
* freed by the next unblockClient()
* call. */
if (dstkey) incrRefCount(dstkey);
- unblockClient(receiver);
monotime replyTimer;
elapsedStart(&replyTimer);
@@ -292,6 +300,7 @@ void serveClientsBlockedOnListKey(robj *o, readyList *rl) {
listTypePush(o,value,wherefrom);
}
updateStatsOnUnblock(receiver, 0, elapsedUs(replyTimer));
+ unblockClient(receiver);
if (dstkey) decrRefCount(dstkey);
decrRefCount(value);
@@ -335,11 +344,11 @@ void serveClientsBlockedOnSortedSetKey(robj *o, readyList *rl) {
int where = (receiver->lastcmd &&
receiver->lastcmd->proc == bzpopminCommand)
? ZSET_MIN : ZSET_MAX;
- unblockClient(receiver);
monotime replyTimer;
elapsedStart(&replyTimer);
genericZpopCommand(receiver,&rl->key,1,where,1,NULL);
updateStatsOnUnblock(receiver, 0, elapsedUs(replyTimer));
+ unblockClient(receiver);
zcard--;
/* Replicate the command. */
@@ -471,6 +480,10 @@ void serveClientsBlockedOnStreamKey(robj *o, readyList *rl) {
void serveClientsBlockedOnKeyByModule(readyList *rl) {
dictEntry *de;
+ /* Optimization: If no clients are in type BLOCKED_MODULE,
+ * we can skip this loop. */
+ if (!server.blocked_clients_by_type[BLOCKED_MODULE]) return;
+
/* We serve clients in the same order they blocked for
* this key, from the first blocked to the last. */
de = dictFind(rl->db->blocking_keys,rl->key);
@@ -553,7 +566,7 @@ void handleClientsBlockedOnKeys(void) {
* way we can lookup an object multiple times (BLMOVE does
* that) without the risk of it being freed in the second
* lookup, invalidating the first one.
- * See https://github.com/antirez/redis/pull/6554. */
+ * See https://github.com/redis/redis/pull/6554. */
server.fixed_time_expire++;
updateCachedTime(0);