summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorantirez <antirez@gmail.com>2019-04-12 17:18:10 +0200
committerantirez <antirez@gmail.com>2019-04-30 15:16:07 +0200
commitdf0e41b433bd70367a0c37ebd31f22c839226b34 (patch)
treed2a770dc5feb8e4b990c1427137714b548a9ae83
parent45167c56de0d2af780403bd5c806eda6e15ed7a3 (diff)
downloadredis-df0e41b433bd70367a0c37ebd31f22c839226b34.tar.gz
Threaded IO: parsing WIP 1: set current_client in a better scoped way.
-rw-r--r--src/networking.c23
1 files changed, 12 insertions, 11 deletions
diff --git a/src/networking.c b/src/networking.c
index 0e11e1f3f..3faaf4a12 100644
--- a/src/networking.c
+++ b/src/networking.c
@@ -1563,7 +1563,7 @@ int processMultibulkBuffer(client *c) {
* or because a client was blocked and later reactivated, so there could be
* pending query buffer, already representing a full command, to process. */
void processInputBuffer(client *c) {
- server.current_client = c;
+ int deadclient = 0;
/* Keep processing while there is something in the input buffer */
while(c->qb_pos < sdslen(c->querybuf)) {
@@ -1619,6 +1619,7 @@ void processInputBuffer(client *c) {
resetClient(c);
} else {
/* Only reset the client when the command was executed. */
+ server.current_client = c;
if (processCommand(c) == C_OK) {
if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) {
/* Update the applied replication offset of our master. */
@@ -1629,23 +1630,26 @@ void processInputBuffer(client *c) {
* module blocking command, so that the reply callback will
* still be able to access the client argv and argc field.
* The client will be reset in unblockClientFromModule(). */
- if (!(c->flags & CLIENT_BLOCKED) || c->btype != BLOCKED_MODULE)
+ if (!(c->flags & CLIENT_BLOCKED) ||
+ c->btype != BLOCKED_MODULE)
+ {
resetClient(c);
+ }
}
+ if (server.current_client == NULL) deadclient = 1;
+ server.current_client = NULL;
/* freeMemoryIfNeeded may flush slave output buffers. This may
* result into a slave, that may be the active client, to be
* freed. */
- if (server.current_client == NULL) break;
+ if (deadclient) break;
}
}
/* Trim to pos */
- if (server.current_client != NULL && c->qb_pos) {
+ if (!deadclient && c->qb_pos) {
sdsrange(c->querybuf,c->qb_pos,-1);
c->qb_pos = 0;
}
-
- server.current_client = NULL;
}
/* This is a wrapper for processInputBuffer that also cares about handling
@@ -1743,11 +1747,8 @@ void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
}
/* There is more data in the client input buffer, continue parsing it
- * in case to check if there is a full command to execute.
- * Don't do it if the client is flagged as CLIENT_PENDING_READ: it means
- * we are currently in the context of an I/O thread. */
- if (!(c->flags & CLIENT_PENDING_READ))
- processInputBufferAndReplicate(c);
+ * in case to check if there is a full command to execute. */
+ processInputBufferAndReplicate(c);
}
void getClientsMaxBuffers(unsigned long *longest_output_list,