summaryrefslogtreecommitdiff
path: root/src/blocked.c
diff options
context:
space:
mode:
authorHuang Zhw <huang_zhw@126.com>2021-10-07 20:13:42 +0800
committerGitHub <noreply@github.com>2021-10-07 15:13:42 +0300
commitfd135f3e2d3adfdec31980ee69be8f783fc8ddce (patch)
treea7c23ce9ac6ad87a3039a80839fbc8e8d5b7e9d4 /src/blocked.c
parentd98d1ad574076d05b83b1e3a7ed25c95377385fc (diff)
downloadredis-fd135f3e2d3adfdec31980ee69be8f783fc8ddce.tar.gz
Make tracking invalidation messages always after command's reply (#9422)
Tracking invalidation messages were sometimes sent in inconsistent order, before the command's reply rather than after. In addition to that, they were sometimes embedded inside other commands responses, like MULTI-EXEC and MGET.
Diffstat (limited to 'src/blocked.c')
-rw-r--r--src/blocked.c16
1 files changed, 16 insertions, 0 deletions
diff --git a/src/blocked.c b/src/blocked.c
index 67fd3fdca..8a2e41463 100644
--- a/src/blocked.c
+++ b/src/blocked.c
@@ -295,6 +295,8 @@ void serveClientsBlockedOnListKey(robj *o, readyList *rl) {
* call. */
if (dstkey) incrRefCount(dstkey);
+ client *old_client = server.current_client;
+ server.current_client = receiver;
monotime replyTimer;
elapsedStart(&replyTimer);
serveClientBlockedOnList(receiver, o,
@@ -303,6 +305,8 @@ void serveClientsBlockedOnListKey(robj *o, readyList *rl) {
&deleted);
updateStatsOnUnblock(receiver, 0, elapsedUs(replyTimer));
unblockClient(receiver);
+ afterCommand(receiver);
+ server.current_client = old_client;
if (dstkey) decrRefCount(dstkey);
@@ -343,11 +347,15 @@ void serveClientsBlockedOnSortedSetKey(robj *o, readyList *rl) {
? 1 : 0;
int reply_nil_when_empty = use_nested_array;
+ client *old_client = server.current_client;
+ server.current_client = receiver;
monotime replyTimer;
elapsedStart(&replyTimer);
genericZpopCommand(receiver, &rl->key, 1, where, 1, count, use_nested_array, reply_nil_when_empty, &deleted);
updateStatsOnUnblock(receiver, 0, elapsedUs(replyTimer));
unblockClient(receiver);
+ afterCommand(receiver);
+ server.current_client = old_client;
/* Replicate the command. */
int argc = 2;
@@ -442,6 +450,8 @@ void serveClientsBlockedOnStreamKey(robj *o, readyList *rl) {
}
}
+ client *old_client = server.current_client;
+ server.current_client = receiver;
monotime replyTimer;
elapsedStart(&replyTimer);
/* Emit the two elements sub-array consisting of
@@ -470,6 +480,8 @@ void serveClientsBlockedOnStreamKey(robj *o, readyList *rl) {
* valid, so we must do the setup above before
* this call. */
unblockClient(receiver);
+ afterCommand(receiver);
+ server.current_client = old_client;
}
}
}
@@ -514,12 +526,16 @@ void serveClientsBlockedOnKeyByModule(readyList *rl) {
* different modules with different triggers to consider if a key
* is ready or not. This means we can't exit the loop but need
* to continue after the first failure. */
+ client *old_client = server.current_client;
+ server.current_client = receiver;
monotime replyTimer;
elapsedStart(&replyTimer);
if (!moduleTryServeClientBlockedOnKey(receiver, rl->key)) continue;
updateStatsOnUnblock(receiver, 0, elapsedUs(replyTimer));
moduleUnblockClient(receiver);
+ afterCommand(receiver);
+ server.current_client = old_client;
}
}
}