diff options
author | antirez <antirez@gmail.com> | 2019-04-26 19:29:50 +0200 |
---|---|---|
committer | antirez <antirez@gmail.com> | 2019-05-06 18:02:51 +0200 |
commit | 6ab6a97fe6991d1496a3c8efa52280db3a3df3eb (patch) | |
tree | 434fe5c05954c9d716162367f596530efb3c7567 /src/networking.c | |
parent | 647a66ebba5d12d461e830f174a1c90a4e96c5cd (diff) | |
download | redis-6ab6a97fe6991d1496a3c8efa52280db3a3df3eb.tar.gz |
Threaded IO: parsing WIP 2: refactoring to parse from thread.
Diffstat (limited to 'src/networking.c')
-rw-r--r-- | src/networking.c | 87 |
1 files changed, 59 insertions, 28 deletions
diff --git a/src/networking.c b/src/networking.c index 3faaf4a12..4361ab1af 100644 --- a/src/networking.c +++ b/src/networking.c @@ -1558,13 +1558,47 @@ int processMultibulkBuffer(client *c) { return C_ERR; } +/* This function calls processCommand(), but also performs a few sub tasks + * that are useful in that context: + * + * 1. It sets the current client to the client 'c'. + * 2. In the case of master clients, the replication offset is updated. + * 3. The client is reset unless there are reasons to avoid doing it. + * + * The function returns C_ERR in case the client was freed as a side effect + * of processing the command, otherwise C_OK is returned. */ +int processCommandAndResetClient(client *c) { + int deadclient = 0; + server.current_client = c; + if (processCommand(c) == C_OK) { + if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) { + /* Update the applied replication offset of our master. */ + c->reploff = c->read_reploff - sdslen(c->querybuf) + c->qb_pos; + } + + /* Don't reset the client structure for clients blocked in a + * module blocking command, so that the reply callback will + * still be able to access the client argv and argc field. + * The client will be reset in unblockClientFromModule(). */ + if (!(c->flags & CLIENT_BLOCKED) || + c->btype != BLOCKED_MODULE) + { + resetClient(c); + } + } + if (server.current_client == NULL) deadclient = 1; + server.current_client = NULL; + /* freeMemoryIfNeeded may flush slave output buffers. This may + * result into a slave, that may be the active client, to be + * freed. */ + return deadclient ? C_ERR : C_OK; +} + /* This function is called every time, in the client structure 'c', there is * more query buffer to process, because we read more data from the socket * or because a client was blocked and later reactivated, so there could be * pending query buffer, already representing a full command, to process. */ void processInputBuffer(client *c) { - int deadclient = 0; - /* Keep processing while there is something in the input buffer */ while(c->qb_pos < sdslen(c->querybuf)) { /* Return if clients are paused. */ @@ -1573,6 +1607,10 @@ void processInputBuffer(client *c) { /* Immediately abort if the client is in the middle of something. */ if (c->flags & CLIENT_BLOCKED) break; + /* Don't process more buffers from clients that have already pending + * commands to execute in c->argv. */ + if (c->flags & CLIENT_PENDING_COMMAND) break; + /* Don't process input from the master while there is a busy script * condition on the slave. We want just to accumulate the replication * stream (instead of replying -BUSY like we do with other clients) and @@ -1618,35 +1656,26 @@ void processInputBuffer(client *c) { if (c->argc == 0) { resetClient(c); } else { - /* Only reset the client when the command was executed. */ - server.current_client = c; - if (processCommand(c) == C_OK) { - if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) { - /* Update the applied replication offset of our master. */ - c->reploff = c->read_reploff - sdslen(c->querybuf) + c->qb_pos; - } + /* If we are in the context of an I/O thread, we can't really + * execute the command here. All we can do is to flag the client + * as one that needs to process the command. */ + if (c->flags & CLIENT_PENDING_READ) { + c->flags |= CLIENT_PENDING_COMMAND; + break; + } - /* Don't reset the client structure for clients blocked in a - * module blocking command, so that the reply callback will - * still be able to access the client argv and argc field. - * The client will be reset in unblockClientFromModule(). */ - if (!(c->flags & CLIENT_BLOCKED) || - c->btype != BLOCKED_MODULE) - { - resetClient(c); - } + /* We are finally ready to execute the command. */ + if (processCommandAndResetClient(c) == C_ERR) { + /* If the client is no longer valid, we avoid exiting this + * loop and trimming the client buffer later. So we return + * ASAP in that case. */ + return; } - if (server.current_client == NULL) deadclient = 1; - server.current_client = NULL; - /* freeMemoryIfNeeded may flush slave output buffers. This may - * result into a slave, that may be the active client, to be - * freed. */ - if (deadclient) break; } } /* Trim to pos */ - if (!deadclient && c->qb_pos) { + if (c->qb_pos) { sdsrange(c->querybuf,c->qb_pos,-1); c->qb_pos = 0; } @@ -1737,9 +1766,7 @@ void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) { sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty(); bytes = sdscatrepr(bytes,c->querybuf,64); -// FIXME: This may be called from an I/O thread and it is not safe to -// log from there for now. -// serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes); + serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes); sdsfree(ci); sdsfree(bytes); freeClientAsync(c); @@ -2747,6 +2774,10 @@ int handleClientsWithPendingReadsUsingThreads(void) { while((ln = listNext(&li))) { client *c = listNodeValue(ln); c->flags &= ~CLIENT_PENDING_READ; + if (c->flags & CLIENT_PENDING_COMMAND) { + c->flags &= ~ CLIENT_PENDING_COMMAND; + processCommandAndResetClient(c); + } processInputBufferAndReplicate(c); } listEmpty(server.clients_pending_read); |