diff options
author | Oran Agra <oran@redislabs.com> | 2022-04-27 16:32:17 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-04-27 16:32:17 +0300 |
commit | d375595d5e3ae2e5c29e6c00a2dc3d60578fd9fc (patch) | |
tree | c4d753d3ee0109e3513a879af8c5487e002d10a3 /src/networking.c | |
parent | fb4e0d400ff82117104bde5296c477ad95f8dd41 (diff) | |
parent | c1f3020631ea8640f2b6aa666a245dd76635a807 (diff) | |
download | redis-7.0.0.tar.gz |
Merge pull request #10652 from oranagra/redis-7.0.07.0.0
Redis 7.0.0
Diffstat (limited to 'src/networking.c')
-rw-r--r-- | src/networking.c | 75 |
1 files changed, 47 insertions, 28 deletions
diff --git a/src/networking.c b/src/networking.c index 767d871d8..0664e2bf0 100644 --- a/src/networking.c +++ b/src/networking.c @@ -160,6 +160,7 @@ client *createClient(connection *conn) { c->bulklen = -1; c->sentlen = 0; c->flags = 0; + c->slot = -1; c->ctime = c->lastinteraction = server.unixtime; clientSetDefaultAuth(c); c->replstate = REPL_STATE_NONE; @@ -215,6 +216,23 @@ client *createClient(connection *conn) { return c; } +void installClientWriteHandler(client *c) { + int ae_barrier = 0; + /* For the fsync=always policy, we want that a given FD is never + * served for reading and writing in the same event loop iteration, + * so that in the middle of receiving the query, and serving it + * to the client, we'll call beforeSleep() that will do the + * actual fsync of AOF to disk. the write barrier ensures that. */ + if (server.aof_state == AOF_ON && + server.aof_fsync == AOF_FSYNC_ALWAYS) + { + ae_barrier = 1; + } + if (connSetWriteHandlerWithBarrier(c->conn, sendReplyToClient, ae_barrier) == C_ERR) { + freeClientAsync(c); + } +} + /* This function puts the client in the queue of clients that should write * their output buffers to the socket. Note that it does not *yet* install * the write handler, to start clients are put in a queue of clients that need @@ -222,7 +240,7 @@ client *createClient(connection *conn) { * handleClientsWithPendingWrites() function). * If we fail and there is more data to write, compared to what the socket * buffers can hold, then we'll really install the handler. */ -void clientInstallWriteHandler(client *c) { +void putClientInPendingWriteQueue(client *c) { /* Schedule the client to write the output buffers to the socket only * if not already done and, for slaves, if the slave can actually receive * writes at this stage. */ @@ -285,11 +303,11 @@ int prepareClientToWrite(client *c) { * it should already be setup to do so (it has already pending data). * * If CLIENT_PENDING_READ is set, we're in an IO thread and should - * not install a write handler. Instead, it will be done by - * handleClientsWithPendingReadsUsingThreads() upon return. + * not put the client in pending write queue. Instead, it will be + * done by handleClientsWithPendingReadsUsingThreads() upon return. */ if (!clientHasPendingReplies(c) && io_threads_op == IO_THREADS_OP_IDLE) - clientInstallWriteHandler(c); + putClientInPendingWriteQueue(c); /* Authorize the caller to queue in the output buffer of this client. */ return C_OK; @@ -521,6 +539,21 @@ void afterErrorReply(client *c, const char *s, size_t len, int flags) { showLatestBacklog(); } server.stat_unexpected_error_replies++; + + /* Based off the propagation error behavior, check if we need to panic here. There + * are currently two checked cases: + * * If this command was from our master and we are not a writable replica. + * * We are reading from an AOF file. */ + int panic_in_replicas = (ctype == CLIENT_TYPE_MASTER && server.repl_slave_ro) + && (server.propagation_error_behavior == PROPAGATION_ERR_BEHAVIOR_PANIC || + server.propagation_error_behavior == PROPAGATION_ERR_BEHAVIOR_PANIC_ON_REPLICAS); + int panic_in_aof = c->id == CLIENT_ID_AOF + && server.propagation_error_behavior == PROPAGATION_ERR_BEHAVIOR_PANIC; + if (panic_in_replicas || panic_in_aof) { + serverPanic("This %s panicked sending an error to its %s" + " after processing the command '%s'", + from, to, cmdname ? cmdname : "<unknown>"); + } } } @@ -1061,7 +1094,7 @@ void addReplySubcommandSyntaxError(client *c) { sds cmd = sdsnew((char*) c->argv[0]->ptr); sdstoupper(cmd); addReplyErrorFormat(c, - "Unknown subcommand or wrong number of arguments for '%.128s'. Try %s HELP.", + "unknown subcommand or wrong number of arguments for '%.128s'. Try %s HELP.", (char*)c->argv[1]->ptr,cmd); sdsfree(cmd); } @@ -1995,20 +2028,7 @@ int handleClientsWithPendingWrites(void) { /* If after the synchronous writes above we still have data to * output to the client, we need to install the writable handler. */ if (clientHasPendingReplies(c)) { - int ae_barrier = 0; - /* For the fsync=always policy, we want that a given FD is never - * served for reading and writing in the same event loop iteration, - * so that in the middle of receiving the query, and serving it - * to the client, we'll call beforeSleep() that will do the - * actual fsync of AOF to disk. the write barrier ensures that. */ - if (server.aof_state == AOF_ON && - server.aof_fsync == AOF_FSYNC_ALWAYS) - { - ae_barrier = 1; - } - if (connSetWriteHandlerWithBarrier(c->conn, sendReplyToClient, ae_barrier) == C_ERR) { - freeClientAsync(c); - } + installClientWriteHandler(c); } } return processed; @@ -2022,6 +2042,7 @@ void resetClient(client *c) { c->reqtype = 0; c->multibulklen = 0; c->bulklen = -1; + c->slot = -1; if (c->deferred_reply_errors) listRelease(c->deferred_reply_errors); @@ -2075,7 +2096,7 @@ void unprotectClient(client *c) { c->flags &= ~CLIENT_PROTECTED; if (c->conn) { connSetReadHandler(c->conn,readQueryFromClient); - if (clientHasPendingReplies(c)) clientInstallWriteHandler(c); + if (clientHasPendingReplies(c)) putClientInPendingWriteQueue(c); } } } @@ -2649,7 +2670,7 @@ void readQueryFromClient(connection *conn) { /* There is more data in the client input buffer, continue parsing it * and check if there is a full command to execute. */ - if (processInputBuffer(c) == C_ERR) + if (processInputBuffer(c) == C_ERR) c = NULL; done: @@ -3808,7 +3829,7 @@ void flushSlavesOutputBuffers(void) { } } -/* Compute current most restictive pause type and its end time, aggregated for +/* Compute current most restrictive pause type and its end time, aggregated for * all pause purposes. */ static void updateClientPauseTypeAndEndTime(void) { pause_type old_type = server.client_pause_type; @@ -4212,10 +4233,8 @@ int handleClientsWithPendingWritesUsingThreads(void) { /* Install the write handler if there are pending writes in some * of the clients. */ - if (clientHasPendingReplies(c) && - connSetWriteHandler(c->conn, sendReplyToClient) == AE_ERR) - { - freeClientAsync(c); + if (clientHasPendingReplies(c)) { + installClientWriteHandler(c); } } listEmpty(server.clients_pending_write); @@ -4327,10 +4346,10 @@ int handleClientsWithPendingReadsUsingThreads(void) { } /* We may have pending replies if a thread readQueryFromClient() produced - * replies and did not install a write handler (it can't). + * replies and did not put the client in pending write queue (it can't). */ if (!(c->flags & CLIENT_PENDING_WRITE) && clientHasPendingReplies(c)) - clientInstallWriteHandler(c); + putClientInPendingWriteQueue(c); } /* Update processed count on server */ |