summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMadelyn Olson <34459052+madolson@users.noreply.github.com>2021-02-22 13:05:00 -0800
committerGitHub <noreply@github.com>2021-02-22 23:05:00 +0200
commit0c65342889537fa644003db09b1ca15dcda7c731 (patch)
tree9c4999a7412ebe6dc09d2b039638052eeb7578ae
parent8d70d498d5263c370ac25f2736839f0ff6a9fb7d (diff)
downloadredis-0c65342889537fa644003db09b1ca15dcda7c731.tar.gz
Remove race condition and consistency issues with client pause and threaded IO (#8520)
clientsArePaused isn't thread safe because it has a side effect of attempting to unpause, which may cause multiple threads concurrently updating the unblocked_clients global list. This change resolves this issue by no longer postponing client for threaded reads when clients are paused and then skipping the check for client paused for threaded reads, in case one is postponed and then clients are paused. (I don't think this is strictly possible, but being defensive seems better here)
-rw-r--r--src/blocked.c8
-rw-r--r--src/networking.c38
-rw-r--r--src/server.h1
3 files changed, 37 insertions, 10 deletions
diff --git a/src/blocked.c b/src/blocked.c
index 485617352..396a71569 100644
--- a/src/blocked.c
+++ b/src/blocked.c
@@ -98,6 +98,9 @@ void processUnblockedClients(void) {
client *c;
while (listLength(server.unblocked_clients)) {
+ /* If clients are paused we yield for now, since
+ * we don't want to process any commands later. */
+ if (clientsArePaused()) return;
ln = listFirst(server.unblocked_clients);
serverAssert(ln != NULL);
c = ln->value;
@@ -109,6 +112,11 @@ void processUnblockedClients(void) {
* client is not blocked before to proceed, but things may change and
* the code is conceptually more correct this way. */
if (!(c->flags & CLIENT_BLOCKED)) {
+ /* If we have a queued command, execute it now. */
+ if (processPendingCommandsAndResetClient(c) == C_ERR) {
+ continue;
+ }
+ /* Then process client if it has more data in it's buffer. */
if (c->querybuf && sdslen(c->querybuf) > 0) {
processInputBuffer(c);
}
diff --git a/src/networking.c b/src/networking.c
index dd2ca1aae..21fef52ba 100644
--- a/src/networking.c
+++ b/src/networking.c
@@ -1903,6 +1903,19 @@ int processCommandAndResetClient(client *c) {
return deadclient ? C_ERR : C_OK;
}
+/* This function will execute any fully parsed commands pending on
+ * the client. Returns C_ERR if the client is no longer valid after executing
+ * the command, and C_OK for all other cases. */
+int processPendingCommandsAndResetClient(client *c) {
+ if (c->flags & CLIENT_PENDING_COMMAND) {
+ c->flags &= ~CLIENT_PENDING_COMMAND;
+ if (processCommandAndResetClient(c) == C_ERR) {
+ return C_ERR;
+ }
+ }
+ return C_OK;
+}
+
/* This function is called every time, in the client structure 'c', there is
* more query buffer to process, because we read more data from the socket
* or because a client was blocked and later reactivated, so there could be
@@ -1911,7 +1924,9 @@ void processInputBuffer(client *c) {
/* Keep processing while there is something in the input buffer */
while(c->qb_pos < sdslen(c->querybuf)) {
/* Return if clients are paused. */
- if (!(c->flags & CLIENT_SLAVE) && clientsArePaused()) break;
+ if (!(c->flags & CLIENT_SLAVE) &&
+ !(c->flags & CLIENT_PENDING_READ) &&
+ clientsArePaused()) break;
/* Immediately abort if the client is in the middle of something. */
if (c->flags & CLIENT_BLOCKED) break;
@@ -3269,6 +3284,7 @@ int handleClientsWithPendingWritesUsingThreads(void) {
int postponeClientRead(client *c) {
if (server.io_threads_active &&
server.io_threads_do_reads &&
+ !clientsArePaused() &&
!ProcessingEventsWhileBlocked &&
!(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_PENDING_READ)))
{
@@ -3336,15 +3352,17 @@ int handleClientsWithPendingReadsUsingThreads(void) {
client *c = listNodeValue(ln);
c->flags &= ~CLIENT_PENDING_READ;
listDelNode(server.clients_pending_read,ln);
-
- if (c->flags & CLIENT_PENDING_COMMAND) {
- c->flags &= ~CLIENT_PENDING_COMMAND;
- if (processCommandAndResetClient(c) == C_ERR) {
- /* If the client is no longer valid, we avoid
- * processing the client later. So we just go
- * to the next. */
- continue;
- }
+ /* Clients can become paused while executing the queued commands,
+ * so we need to check in between each command. If a pause was
+ * executed, we still remove the command and it will get picked up
+ * later when clients are unpaused and we re-queue all clients. */
+ if (clientsArePaused()) continue;
+
+ if (processPendingCommandsAndResetClient(c) == C_ERR) {
+ /* If the client is no longer valid, we avoid
+ * processing the client later. So we just go
+ * to the next. */
+ continue;
}
processInputBuffer(c);
diff --git a/src/server.h b/src/server.h
index 96963c3b5..e48c1bf84 100644
--- a/src/server.h
+++ b/src/server.h
@@ -2020,6 +2020,7 @@ size_t freeMemoryGetNotCountedMemory();
int freeMemoryIfNeeded(void);
int freeMemoryIfNeededAndSafe(void);
int processCommand(client *c);
+int processPendingCommandsAndResetClient(client *c);
void setupSignalHandlers(void);
struct redisCommand *lookupCommand(sds name);
struct redisCommand *lookupCommandByCString(const char *s);