summaryrefslogtreecommitdiff
path: root/src/networking.c
diff options
context:
space:
mode:
authorOran Agra <oran@redislabs.com>2022-04-27 16:32:17 +0300
committerGitHub <noreply@github.com>2022-04-27 16:32:17 +0300
commitd375595d5e3ae2e5c29e6c00a2dc3d60578fd9fc (patch)
treec4d753d3ee0109e3513a879af8c5487e002d10a3 /src/networking.c
parentfb4e0d400ff82117104bde5296c477ad95f8dd41 (diff)
parentc1f3020631ea8640f2b6aa666a245dd76635a807 (diff)
downloadredis-7.0.0.tar.gz
Merge pull request #10652 from oranagra/redis-7.0.07.0.0
Redis 7.0.0
Diffstat (limited to 'src/networking.c')
-rw-r--r--src/networking.c75
1 files changed, 47 insertions, 28 deletions
diff --git a/src/networking.c b/src/networking.c
index 767d871d8..0664e2bf0 100644
--- a/src/networking.c
+++ b/src/networking.c
@@ -160,6 +160,7 @@ client *createClient(connection *conn) {
c->bulklen = -1;
c->sentlen = 0;
c->flags = 0;
+ c->slot = -1;
c->ctime = c->lastinteraction = server.unixtime;
clientSetDefaultAuth(c);
c->replstate = REPL_STATE_NONE;
@@ -215,6 +216,23 @@ client *createClient(connection *conn) {
return c;
}
+void installClientWriteHandler(client *c) {
+ int ae_barrier = 0;
+ /* For the fsync=always policy, we want that a given FD is never
+ * served for reading and writing in the same event loop iteration,
+ * so that in the middle of receiving the query, and serving it
+ * to the client, we'll call beforeSleep() that will do the
+ * actual fsync of AOF to disk. the write barrier ensures that. */
+ if (server.aof_state == AOF_ON &&
+ server.aof_fsync == AOF_FSYNC_ALWAYS)
+ {
+ ae_barrier = 1;
+ }
+ if (connSetWriteHandlerWithBarrier(c->conn, sendReplyToClient, ae_barrier) == C_ERR) {
+ freeClientAsync(c);
+ }
+}
+
/* This function puts the client in the queue of clients that should write
* their output buffers to the socket. Note that it does not *yet* install
* the write handler, to start clients are put in a queue of clients that need
@@ -222,7 +240,7 @@ client *createClient(connection *conn) {
* handleClientsWithPendingWrites() function).
* If we fail and there is more data to write, compared to what the socket
* buffers can hold, then we'll really install the handler. */
-void clientInstallWriteHandler(client *c) {
+void putClientInPendingWriteQueue(client *c) {
/* Schedule the client to write the output buffers to the socket only
* if not already done and, for slaves, if the slave can actually receive
* writes at this stage. */
@@ -285,11 +303,11 @@ int prepareClientToWrite(client *c) {
* it should already be setup to do so (it has already pending data).
*
* If CLIENT_PENDING_READ is set, we're in an IO thread and should
- * not install a write handler. Instead, it will be done by
- * handleClientsWithPendingReadsUsingThreads() upon return.
+ * not put the client in pending write queue. Instead, it will be
+ * done by handleClientsWithPendingReadsUsingThreads() upon return.
*/
if (!clientHasPendingReplies(c) && io_threads_op == IO_THREADS_OP_IDLE)
- clientInstallWriteHandler(c);
+ putClientInPendingWriteQueue(c);
/* Authorize the caller to queue in the output buffer of this client. */
return C_OK;
@@ -521,6 +539,21 @@ void afterErrorReply(client *c, const char *s, size_t len, int flags) {
showLatestBacklog();
}
server.stat_unexpected_error_replies++;
+
+ /* Based off the propagation error behavior, check if we need to panic here. There
+ * are currently two checked cases:
+ * * If this command was from our master and we are not a writable replica.
+ * * We are reading from an AOF file. */
+ int panic_in_replicas = (ctype == CLIENT_TYPE_MASTER && server.repl_slave_ro)
+ && (server.propagation_error_behavior == PROPAGATION_ERR_BEHAVIOR_PANIC ||
+ server.propagation_error_behavior == PROPAGATION_ERR_BEHAVIOR_PANIC_ON_REPLICAS);
+ int panic_in_aof = c->id == CLIENT_ID_AOF
+ && server.propagation_error_behavior == PROPAGATION_ERR_BEHAVIOR_PANIC;
+ if (panic_in_replicas || panic_in_aof) {
+ serverPanic("This %s panicked sending an error to its %s"
+ " after processing the command '%s'",
+ from, to, cmdname ? cmdname : "<unknown>");
+ }
}
}
@@ -1061,7 +1094,7 @@ void addReplySubcommandSyntaxError(client *c) {
sds cmd = sdsnew((char*) c->argv[0]->ptr);
sdstoupper(cmd);
addReplyErrorFormat(c,
- "Unknown subcommand or wrong number of arguments for '%.128s'. Try %s HELP.",
+ "unknown subcommand or wrong number of arguments for '%.128s'. Try %s HELP.",
(char*)c->argv[1]->ptr,cmd);
sdsfree(cmd);
}
@@ -1995,20 +2028,7 @@ int handleClientsWithPendingWrites(void) {
/* If after the synchronous writes above we still have data to
* output to the client, we need to install the writable handler. */
if (clientHasPendingReplies(c)) {
- int ae_barrier = 0;
- /* For the fsync=always policy, we want that a given FD is never
- * served for reading and writing in the same event loop iteration,
- * so that in the middle of receiving the query, and serving it
- * to the client, we'll call beforeSleep() that will do the
- * actual fsync of AOF to disk. the write barrier ensures that. */
- if (server.aof_state == AOF_ON &&
- server.aof_fsync == AOF_FSYNC_ALWAYS)
- {
- ae_barrier = 1;
- }
- if (connSetWriteHandlerWithBarrier(c->conn, sendReplyToClient, ae_barrier) == C_ERR) {
- freeClientAsync(c);
- }
+ installClientWriteHandler(c);
}
}
return processed;
@@ -2022,6 +2042,7 @@ void resetClient(client *c) {
c->reqtype = 0;
c->multibulklen = 0;
c->bulklen = -1;
+ c->slot = -1;
if (c->deferred_reply_errors)
listRelease(c->deferred_reply_errors);
@@ -2075,7 +2096,7 @@ void unprotectClient(client *c) {
c->flags &= ~CLIENT_PROTECTED;
if (c->conn) {
connSetReadHandler(c->conn,readQueryFromClient);
- if (clientHasPendingReplies(c)) clientInstallWriteHandler(c);
+ if (clientHasPendingReplies(c)) putClientInPendingWriteQueue(c);
}
}
}
@@ -2649,7 +2670,7 @@ void readQueryFromClient(connection *conn) {
/* There is more data in the client input buffer, continue parsing it
* and check if there is a full command to execute. */
- if (processInputBuffer(c) == C_ERR)
+ if (processInputBuffer(c) == C_ERR)
c = NULL;
done:
@@ -3808,7 +3829,7 @@ void flushSlavesOutputBuffers(void) {
}
}
-/* Compute current most restictive pause type and its end time, aggregated for
+/* Compute current most restrictive pause type and its end time, aggregated for
* all pause purposes. */
static void updateClientPauseTypeAndEndTime(void) {
pause_type old_type = server.client_pause_type;
@@ -4212,10 +4233,8 @@ int handleClientsWithPendingWritesUsingThreads(void) {
/* Install the write handler if there are pending writes in some
* of the clients. */
- if (clientHasPendingReplies(c) &&
- connSetWriteHandler(c->conn, sendReplyToClient) == AE_ERR)
- {
- freeClientAsync(c);
+ if (clientHasPendingReplies(c)) {
+ installClientWriteHandler(c);
}
}
listEmpty(server.clients_pending_write);
@@ -4327,10 +4346,10 @@ int handleClientsWithPendingReadsUsingThreads(void) {
}
/* We may have pending replies if a thread readQueryFromClient() produced
- * replies and did not install a write handler (it can't).
+ * replies and did not put the client in pending write queue (it can't).
*/
if (!(c->flags & CLIENT_PENDING_WRITE) && clientHasPendingReplies(c))
- clientInstallWriteHandler(c);
+ putClientInPendingWriteQueue(c);
}
/* Update processed count on server */