summaryrefslogtreecommitdiff
path: root/src/blocked.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/blocked.c')
-rw-r--r--src/blocked.c42
1 files changed, 34 insertions, 8 deletions
diff --git a/src/blocked.c b/src/blocked.c
index 7fe29e198..7c10460b1 100644
--- a/src/blocked.c
+++ b/src/blocked.c
@@ -79,6 +79,7 @@ void initClientBlockingState(client *c) {
c->bstate.numreplicas = 0;
c->bstate.reploffset = 0;
c->bstate.unblock_on_nokey = 0;
+ c->bstate.async_rm_call_handle = NULL;
}
/* Block a client for the specific operation type. Once the CLIENT_BLOCKED
@@ -92,7 +93,7 @@ void blockClient(client *c, int btype) {
c->flags |= CLIENT_BLOCKED;
c->bstate.btype = btype;
- server.blocked_clients++;
+ if (!(c->flags & CLIENT_MODULE)) server.blocked_clients++; /* We count blocked client stats on regular clients and not on module clients */
server.blocked_clients_by_type[btype]++;
addClientToTimeoutTable(c);
}
@@ -131,6 +132,13 @@ void processUnblockedClients(void) {
listDelNode(server.unblocked_clients,ln);
c->flags &= ~CLIENT_UNBLOCKED;
+ if (c->flags & CLIENT_MODULE) {
+ if (!(c->flags & CLIENT_BLOCKED)) {
+ moduleCallCommandUnblockedHandler(c);
+ }
+ continue;
+ }
+
/* Process remaining data in the input buffer, unless the client
* is blocked again. Actually processInputBuffer() checks that the
* client is not blocked before to proceed, but things may change and
@@ -172,7 +180,7 @@ void queueClientForReprocessing(client *c) {
/* Unblock a client calling the right function depending on the kind
* of operation the client is blocking for. */
-void unblockClient(client *c) {
+void unblockClient(client *c, int queue_for_reprocessing) {
if (c->bstate.btype == BLOCKED_LIST ||
c->bstate.btype == BLOCKED_ZSET ||
c->bstate.btype == BLOCKED_STREAM) {
@@ -205,13 +213,13 @@ void unblockClient(client *c) {
/* Clear the flags, and put the client in the unblocked list so that
* we'll process new commands in its query buffer ASAP. */
- server.blocked_clients--;
+ if (!(c->flags & CLIENT_MODULE)) server.blocked_clients--; /* We count blocked client stats on regular clients and not on module clients */
server.blocked_clients_by_type[c->bstate.btype]--;
c->flags &= ~CLIENT_BLOCKED;
c->bstate.btype = BLOCKED_NONE;
c->bstate.unblock_on_nokey = 0;
removeClientFromTimeoutTable(c);
- queueClientForReprocessing(c);
+ if (queue_for_reprocessing) queueClientForReprocessing(c);
}
/* This function gets called when a blocked client timed out in order to
@@ -247,7 +255,7 @@ void replyToClientsBlockedOnShutdown(void) {
client *c = listNodeValue(ln);
if (c->flags & CLIENT_BLOCKED && c->bstate.btype == BLOCKED_SHUTDOWN) {
addReplyError(c, "Errors trying to SHUTDOWN. Check logs.");
- unblockClient(c);
+ unblockClient(c, 1);
}
}
}
@@ -632,12 +640,30 @@ static void unblockClientOnKey(client *c, robj *key) {
/* We need to unblock the client before calling processCommandAndResetClient
* because it checks the CLIENT_BLOCKED flag */
- unblockClient(c);
+ unblockClient(c, 0);
/* In case this client was blocked on keys during command
* we need to re process the command again */
if (c->flags & CLIENT_PENDING_COMMAND) {
c->flags &= ~CLIENT_PENDING_COMMAND;
+ /* We want the command processing and the unblock handler (see RM_Call 'K' option)
+ * to run atomically, this is why we must enter the execution unit here before
+ * running the command, and exit the execution unit after calling the unblock handler (if exists).
+ * Notice that we also must set the current client so it will be available
+ * when we will try to send the the client side caching notification (done on 'afterCommand'). */
+ client *old_client = server.current_client;
+ server.current_client = c;
+ enterExecutionUnit(1, 0);
processCommandAndResetClient(c);
+ if (!(c->flags & CLIENT_BLOCKED)) {
+ if (c->flags & CLIENT_MODULE) {
+ moduleCallCommandUnblockedHandler(c);
+ } else {
+ queueClientForReprocessing(c);
+ }
+ }
+ exitExecutionUnit();
+ afterCommand(c);
+ server.current_client = old_client;
}
}
@@ -673,7 +699,7 @@ void unblockClientOnTimeout(client *c) {
replyToBlockedClientTimedOut(c);
if (c->flags & CLIENT_PENDING_COMMAND)
c->flags &= ~CLIENT_PENDING_COMMAND;
- unblockClient(c);
+ unblockClient(c, 1);
}
/* Unblock a client which is currently Blocked with error.
@@ -684,7 +710,7 @@ void unblockClientOnError(client *c, const char *err_str) {
updateStatsOnUnblock(c, 0, 0, 1);
if (c->flags & CLIENT_PENDING_COMMAND)
c->flags &= ~CLIENT_PENDING_COMMAND;
- unblockClient(c);
+ unblockClient(c, 1);
}
/* sets blocking_keys to the total number of keys which has at least one client blocked on them