summaryrefslogtreecommitdiff
path: root/src/networking.c
diff options
context:
space:
mode:
authorantirez <antirez@gmail.com>2019-04-26 19:29:50 +0200
committerantirez <antirez@gmail.com>2019-05-06 18:02:51 +0200
commit6ab6a97fe6991d1496a3c8efa52280db3a3df3eb (patch)
tree434fe5c05954c9d716162367f596530efb3c7567 /src/networking.c
parent647a66ebba5d12d461e830f174a1c90a4e96c5cd (diff)
downloadredis-6ab6a97fe6991d1496a3c8efa52280db3a3df3eb.tar.gz
Threaded IO: parsing WIP 2: refactoring to parse from thread.
Diffstat (limited to 'src/networking.c')
-rw-r--r--src/networking.c87
1 files changed, 59 insertions, 28 deletions
diff --git a/src/networking.c b/src/networking.c
index 3faaf4a12..4361ab1af 100644
--- a/src/networking.c
+++ b/src/networking.c
@@ -1558,13 +1558,47 @@ int processMultibulkBuffer(client *c) {
return C_ERR;
}
+/* This function calls processCommand(), but also performs a few sub tasks
+ * that are useful in that context:
+ *
+ * 1. It sets the current client to the client 'c'.
+ * 2. In the case of master clients, the replication offset is updated.
+ * 3. The client is reset unless there are reasons to avoid doing it.
+ *
+ * The function returns C_ERR in case the client was freed as a side effect
+ * of processing the command, otherwise C_OK is returned. */
+int processCommandAndResetClient(client *c) {
+ int deadclient = 0;
+ server.current_client = c;
+ if (processCommand(c) == C_OK) {
+ if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) {
+ /* Update the applied replication offset of our master. */
+ c->reploff = c->read_reploff - sdslen(c->querybuf) + c->qb_pos;
+ }
+
+ /* Don't reset the client structure for clients blocked in a
+ * module blocking command, so that the reply callback will
+ * still be able to access the client argv and argc field.
+ * The client will be reset in unblockClientFromModule(). */
+ if (!(c->flags & CLIENT_BLOCKED) ||
+ c->btype != BLOCKED_MODULE)
+ {
+ resetClient(c);
+ }
+ }
+ if (server.current_client == NULL) deadclient = 1;
+ server.current_client = NULL;
+ /* freeMemoryIfNeeded may flush slave output buffers. This may
+ * result into a slave, that may be the active client, to be
+ * freed. */
+ return deadclient ? C_ERR : C_OK;
+}
+
/* This function is called every time, in the client structure 'c', there is
* more query buffer to process, because we read more data from the socket
* or because a client was blocked and later reactivated, so there could be
* pending query buffer, already representing a full command, to process. */
void processInputBuffer(client *c) {
- int deadclient = 0;
-
/* Keep processing while there is something in the input buffer */
while(c->qb_pos < sdslen(c->querybuf)) {
/* Return if clients are paused. */
@@ -1573,6 +1607,10 @@ void processInputBuffer(client *c) {
/* Immediately abort if the client is in the middle of something. */
if (c->flags & CLIENT_BLOCKED) break;
+ /* Don't process more buffers from clients that have already pending
+ * commands to execute in c->argv. */
+ if (c->flags & CLIENT_PENDING_COMMAND) break;
+
/* Don't process input from the master while there is a busy script
* condition on the slave. We want just to accumulate the replication
* stream (instead of replying -BUSY like we do with other clients) and
@@ -1618,35 +1656,26 @@ void processInputBuffer(client *c) {
if (c->argc == 0) {
resetClient(c);
} else {
- /* Only reset the client when the command was executed. */
- server.current_client = c;
- if (processCommand(c) == C_OK) {
- if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) {
- /* Update the applied replication offset of our master. */
- c->reploff = c->read_reploff - sdslen(c->querybuf) + c->qb_pos;
- }
+ /* If we are in the context of an I/O thread, we can't really
+ * execute the command here. All we can do is to flag the client
+ * as one that needs to process the command. */
+ if (c->flags & CLIENT_PENDING_READ) {
+ c->flags |= CLIENT_PENDING_COMMAND;
+ break;
+ }
- /* Don't reset the client structure for clients blocked in a
- * module blocking command, so that the reply callback will
- * still be able to access the client argv and argc field.
- * The client will be reset in unblockClientFromModule(). */
- if (!(c->flags & CLIENT_BLOCKED) ||
- c->btype != BLOCKED_MODULE)
- {
- resetClient(c);
- }
+ /* We are finally ready to execute the command. */
+ if (processCommandAndResetClient(c) == C_ERR) {
+ /* If the client is no longer valid, we avoid exiting this
+ * loop and trimming the client buffer later. So we return
+ * ASAP in that case. */
+ return;
}
- if (server.current_client == NULL) deadclient = 1;
- server.current_client = NULL;
- /* freeMemoryIfNeeded may flush slave output buffers. This may
- * result into a slave, that may be the active client, to be
- * freed. */
- if (deadclient) break;
}
}
/* Trim to pos */
- if (!deadclient && c->qb_pos) {
+ if (c->qb_pos) {
sdsrange(c->querybuf,c->qb_pos,-1);
c->qb_pos = 0;
}
@@ -1737,9 +1766,7 @@ void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty();
bytes = sdscatrepr(bytes,c->querybuf,64);
-// FIXME: This may be called from an I/O thread and it is not safe to
-// log from there for now.
-// serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes);
+ serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes);
sdsfree(ci);
sdsfree(bytes);
freeClientAsync(c);
@@ -2747,6 +2774,10 @@ int handleClientsWithPendingReadsUsingThreads(void) {
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
c->flags &= ~CLIENT_PENDING_READ;
+ if (c->flags & CLIENT_PENDING_COMMAND) {
+ c->flags &= ~ CLIENT_PENDING_COMMAND;
+ processCommandAndResetClient(c);
+ }
processInputBufferAndReplicate(c);
}
listEmpty(server.clients_pending_read);