summaryrefslogtreecommitdiff
path: root/src/blocked.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/blocked.c')
-rw-r--r--src/blocked.c25
1 files changed, 23 insertions, 2 deletions
diff --git a/src/blocked.c b/src/blocked.c
index d85723458..46935c79f 100644
--- a/src/blocked.c
+++ b/src/blocked.c
@@ -89,6 +89,12 @@ void blockClient(client *c, int btype) {
server.blocked_clients++;
server.blocked_clients_by_type[btype]++;
addClientToTimeoutTable(c);
+ if (btype == BLOCKED_PAUSE) {
+ listAddNodeTail(server.paused_clients, c);
+ c->paused_list_node = listLast(server.paused_clients);
+ /* Mark this client to execute its command */
+ c->flags |= CLIENT_PENDING_COMMAND;
+ }
}
/* This function is called in the beforeSleep() function of the event loop
@@ -110,6 +116,11 @@ void processUnblockedClients(void) {
* client is not blocked before to proceed, but things may change and
* the code is conceptually more correct this way. */
if (!(c->flags & CLIENT_BLOCKED)) {
+ /* If we have a queued command, execute it now. */
+ if (processPendingCommandsAndResetClient(c) == C_ERR) {
+ continue;
+ }
+ /* Then process client if it has more data in it's buffer. */
if (c->querybuf && sdslen(c->querybuf) > 0) {
processInputBuffer(c);
}
@@ -154,6 +165,9 @@ void unblockClient(client *c) {
} else if (c->btype == BLOCKED_MODULE) {
if (moduleClientIsBlockedOnKeys(c)) unblockClientWaitingData(c);
unblockClientFromModule(c);
+ } else if (c->btype == BLOCKED_PAUSE) {
+ listDelNode(server.paused_clients,c->paused_list_node);
+ c->paused_list_node = NULL;
} else {
serverPanic("Unknown btype in unblockClient().");
}
@@ -200,9 +214,16 @@ void disconnectAllBlockedClients(void) {
client *c = listNodeValue(ln);
if (c->flags & CLIENT_BLOCKED) {
- addReplySds(c,sdsnew(
+ /* PAUSED clients are an exception, when they'll be unblocked, the
+ * command processing will start from scratch, and the command will
+ * be either executed or rejected. (unlike LIST blocked clients for
+ * which the command is already in progress in a way. */
+ if (c->btype == BLOCKED_PAUSE)
+ continue;
+
+ addReplyError(c,
"-UNBLOCKED force unblock from blocking operation, "
- "instance state changed (master -> replica?)\r\n"));
+ "instance state changed (master -> replica?)");
unblockClient(c);
c->flags |= CLIENT_CLOSE_AFTER_REPLY;
}