summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorantirez <antirez@gmail.com>2018-09-03 18:39:18 +0200
committerantirez <antirez@gmail.com>2018-09-03 18:39:18 +0200
commit6c001bfc0ddf8e7a93e86dad250e416166253d85 (patch)
tree134d234df4da8d5af23f5652ae27eb900ae28d5b
parent2b689ad641f19573b3b6141b45452da90ca32df4 (diff)
downloadredis-6c001bfc0ddf8e7a93e86dad250e416166253d85.tar.gz
Unblocked clients API refactoring. See #4418.
-rw-r--r--src/blocked.c32
-rw-r--r--src/networking.c9
-rw-r--r--src/scripting.c6
-rw-r--r--src/server.h1
4 files changed, 33 insertions, 15 deletions
diff --git a/src/blocked.c b/src/blocked.c
index aeca87a6e..00212ed69 100644
--- a/src/blocked.c
+++ b/src/blocked.c
@@ -132,6 +132,31 @@ void processUnblockedClients(void) {
}
}
+/* This function will schedule the client for reprocessing at a safe time.
+ *
+ * This is useful when a client was blocked for some reason (blocking opeation,
+ * CLIENT PAUSE, or whatever), because it may end with some accumulated query
+ * buffer that needs to be processed ASAP:
+ *
+ * 1. When a client is blocked, its readable handler is still active.
+ * 2. However in this case it only gets data into the query buffer, but the
+ * query is not parsed or executed once there is enough to proceed as
+ * usually (because the client is blocked... so we can't execute commands).
+ * 3. When the client is unblocked, without this function, the client would
+ * have to write some query in order for the readable handler to finally
+ * call processQueryBuffer*() on it.
+ * 4. With this function instead we can put the client in a queue that will
+ * process it for queries ready to be executed at a safe time.
+ */
+void queueClientForReprocessing(client *c) {
+ /* The client may already be into the unblocked list because of a previous
+ * blocking operation, don't add back it into the list multiple times. */
+ if (!(c->flags & CLIENT_UNBLOCKED)) {
+ c->flags |= CLIENT_UNBLOCKED;
+ listAddNodeTail(server.unblocked_clients,c);
+ }
+}
+
/* Unblock a client calling the right function depending on the kind
* of operation the client is blocking for. */
void unblockClient(client *c) {
@@ -152,12 +177,7 @@ void unblockClient(client *c) {
server.blocked_clients_by_type[c->btype]--;
c->flags &= ~CLIENT_BLOCKED;
c->btype = BLOCKED_NONE;
- /* The client may already be into the unblocked list because of a previous
- * blocking operation, don't add back it into the list multiple times. */
- if (!(c->flags & CLIENT_UNBLOCKED)) {
- c->flags |= CLIENT_UNBLOCKED;
- listAddNodeTail(server.unblocked_clients,c);
- }
+ queueClientForReprocessing(c);
}
/* This function gets called when a blocked client timed out in order to
diff --git a/src/networking.c b/src/networking.c
index 824d88245..0c1b3016f 100644
--- a/src/networking.c
+++ b/src/networking.c
@@ -2134,11 +2134,10 @@ int clientsArePaused(void) {
while ((ln = listNext(&li)) != NULL) {
c = listNodeValue(ln);
- /* Don't touch slaves and blocked or unblocked clients.
- * The latter pending requests be processed when unblocked. */
- if (c->flags & (CLIENT_SLAVE|CLIENT_BLOCKED|CLIENT_UNBLOCKED)) continue;
- c->flags |= CLIENT_UNBLOCKED;
- listAddNodeTail(server.unblocked_clients,c);
+ /* Don't touch slaves and blocked clients.
+ * The latter pending requests will be processed when unblocked. */
+ if (c->flags & (CLIENT_SLAVE|CLIENT_BLOCKED)) continue;
+ queueClientForReprocessing(c);
}
}
return server.clients_paused;
diff --git a/src/scripting.c b/src/scripting.c
index 6c311dbe0..8aa62071c 100644
--- a/src/scripting.c
+++ b/src/scripting.c
@@ -1367,10 +1367,8 @@ void evalGenericCommand(client *c, int evalsha) {
* script timeout was detected. */
aeCreateFileEvent(server.el,c->fd,AE_READABLE,
readQueryFromClient,c);
- if (server.masterhost && server.master && !(server.master->flags & CLIENT_UNBLOCKED)) {
- server.master->flags |= CLIENT_UNBLOCKED;
- listAddNodeTail(server.unblocked_clients,server.master);
- }
+ if (server.masterhost && server.master)
+ queueClientForReprocessing(server.master);
}
server.lua_caller = NULL;
diff --git a/src/server.h b/src/server.h
index 4a5967f10..09348585b 100644
--- a/src/server.h
+++ b/src/server.h
@@ -1884,6 +1884,7 @@ sds luaCreateFunction(client *c, lua_State *lua, robj *body);
void processUnblockedClients(void);
void blockClient(client *c, int btype);
void unblockClient(client *c);
+void queueClientForReprocessing(client *c);
void replyToBlockedClientTimedOut(client *c);
int getTimeoutFromObjectOrReply(client *c, robj *object, mstime_t *timeout, int unit);
void disconnectAllBlockedClients(void);