diff options
Diffstat (limited to 'src/blocked.c')
-rw-r--r-- | src/blocked.c | 42 |
1 files changed, 34 insertions, 8 deletions
diff --git a/src/blocked.c b/src/blocked.c index 7fe29e198..7c10460b1 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -79,6 +79,7 @@ void initClientBlockingState(client *c) { c->bstate.numreplicas = 0; c->bstate.reploffset = 0; c->bstate.unblock_on_nokey = 0; + c->bstate.async_rm_call_handle = NULL; } /* Block a client for the specific operation type. Once the CLIENT_BLOCKED @@ -92,7 +93,7 @@ void blockClient(client *c, int btype) { c->flags |= CLIENT_BLOCKED; c->bstate.btype = btype; - server.blocked_clients++; + if (!(c->flags & CLIENT_MODULE)) server.blocked_clients++; /* We count blocked client stats on regular clients and not on module clients */ server.blocked_clients_by_type[btype]++; addClientToTimeoutTable(c); } @@ -131,6 +132,13 @@ void processUnblockedClients(void) { listDelNode(server.unblocked_clients,ln); c->flags &= ~CLIENT_UNBLOCKED; + if (c->flags & CLIENT_MODULE) { + if (!(c->flags & CLIENT_BLOCKED)) { + moduleCallCommandUnblockedHandler(c); + } + continue; + } + /* Process remaining data in the input buffer, unless the client * is blocked again. Actually processInputBuffer() checks that the * client is not blocked before to proceed, but things may change and @@ -172,7 +180,7 @@ void queueClientForReprocessing(client *c) { /* Unblock a client calling the right function depending on the kind * of operation the client is blocking for. */ -void unblockClient(client *c) { +void unblockClient(client *c, int queue_for_reprocessing) { if (c->bstate.btype == BLOCKED_LIST || c->bstate.btype == BLOCKED_ZSET || c->bstate.btype == BLOCKED_STREAM) { @@ -205,13 +213,13 @@ void unblockClient(client *c) { /* Clear the flags, and put the client in the unblocked list so that * we'll process new commands in its query buffer ASAP. */ - server.blocked_clients--; + if (!(c->flags & CLIENT_MODULE)) server.blocked_clients--; /* We count blocked client stats on regular clients and not on module clients */ server.blocked_clients_by_type[c->bstate.btype]--; c->flags &= ~CLIENT_BLOCKED; c->bstate.btype = BLOCKED_NONE; c->bstate.unblock_on_nokey = 0; removeClientFromTimeoutTable(c); - queueClientForReprocessing(c); + if (queue_for_reprocessing) queueClientForReprocessing(c); } /* This function gets called when a blocked client timed out in order to @@ -247,7 +255,7 @@ void replyToClientsBlockedOnShutdown(void) { client *c = listNodeValue(ln); if (c->flags & CLIENT_BLOCKED && c->bstate.btype == BLOCKED_SHUTDOWN) { addReplyError(c, "Errors trying to SHUTDOWN. Check logs."); - unblockClient(c); + unblockClient(c, 1); } } } @@ -632,12 +640,30 @@ static void unblockClientOnKey(client *c, robj *key) { /* We need to unblock the client before calling processCommandAndResetClient * because it checks the CLIENT_BLOCKED flag */ - unblockClient(c); + unblockClient(c, 0); /* In case this client was blocked on keys during command * we need to re process the command again */ if (c->flags & CLIENT_PENDING_COMMAND) { c->flags &= ~CLIENT_PENDING_COMMAND; + /* We want the command processing and the unblock handler (see RM_Call 'K' option) + * to run atomically, this is why we must enter the execution unit here before + * running the command, and exit the execution unit after calling the unblock handler (if exists). + * Notice that we also must set the current client so it will be available + * when we will try to send the the client side caching notification (done on 'afterCommand'). */ + client *old_client = server.current_client; + server.current_client = c; + enterExecutionUnit(1, 0); processCommandAndResetClient(c); + if (!(c->flags & CLIENT_BLOCKED)) { + if (c->flags & CLIENT_MODULE) { + moduleCallCommandUnblockedHandler(c); + } else { + queueClientForReprocessing(c); + } + } + exitExecutionUnit(); + afterCommand(c); + server.current_client = old_client; } } @@ -673,7 +699,7 @@ void unblockClientOnTimeout(client *c) { replyToBlockedClientTimedOut(c); if (c->flags & CLIENT_PENDING_COMMAND) c->flags &= ~CLIENT_PENDING_COMMAND; - unblockClient(c); + unblockClient(c, 1); } /* Unblock a client which is currently Blocked with error. @@ -684,7 +710,7 @@ void unblockClientOnError(client *c, const char *err_str) { updateStatsOnUnblock(c, 0, 0, 1); if (c->flags & CLIENT_PENDING_COMMAND) c->flags &= ~CLIENT_PENDING_COMMAND; - unblockClient(c); + unblockClient(c, 1); } /* sets blocking_keys to the total number of keys which has at least one client blocked on them |