From dd53453e8fa7b8c2fc2821488b8692bea30a8f60 Mon Sep 17 00:00:00 2001 From: antirez Date: Mon, 1 Jun 2020 16:40:32 +0200 Subject: Threaded core commands: clients wait queue WIP. --- src/blocked.c | 21 ++++++++++++++------- src/server.c | 5 +++++ src/server.h | 3 ++- 3 files changed, 21 insertions(+), 8 deletions(-) diff --git a/src/blocked.c b/src/blocked.c index 92f1cee65..a14330b5c 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -144,13 +144,15 @@ void queueClientForReprocessing(client *c) { /* Unblock a client calling the right function depending on the kind * of operation the client is blocking for. */ void unblockClient(client *c) { - if (c->btype == BLOCKED_LIST || - c->btype == BLOCKED_ZSET || - c->btype == BLOCKED_STREAM) { + int btype = c->btype; + + if (btype == BLOCKED_LIST || + btype == BLOCKED_ZSET || + btype == BLOCKED_STREAM) { unblockClientWaitingData(c); - } else if (c->btype == BLOCKED_WAIT) { + } else if (btype == BLOCKED_WAIT) { unblockClientWaitingReplicas(c); - } else if (c->btype == BLOCKED_MODULE) { + } else if (btype == BLOCKED_MODULE) { if (moduleClientIsBlockedOnKeys(c)) unblockClientWaitingData(c); unblockClientFromModule(c); } else { @@ -159,11 +161,15 @@ void unblockClient(client *c) { /* Clear the flags, and put the client in the unblocked list so that * we'll process new commands in its query buffer ASAP. */ server.blocked_clients--; - server.blocked_clients_by_type[c->btype]--; + server.blocked_clients_by_type[btype]--; c->flags &= ~CLIENT_BLOCKED; c->btype = BLOCKED_NONE; removeClientFromTimeoutTable(c); queueClientForReprocessing(c); + + /* Re-execute the command if it was not executed at all since the + * client was blocked because of key locks. */ + if (btype == BLOCKED_LOCK) processCommand(c); } /* This function gets called when a blocked client timed out in order to @@ -172,7 +178,8 @@ void unblockClient(client *c) { void replyToBlockedClientTimedOut(client *c) { if (c->btype == BLOCKED_LIST || c->btype == BLOCKED_ZSET || - c->btype == BLOCKED_STREAM) { + c->btype == BLOCKED_STREAM || + c->btype == BLOCKED_LOCK) { addReplyNullArray(c); } else if (c->btype == BLOCKED_WAIT) { addReplyLongLong(c,replicationCountAcksByOffset(c->bpop.reploffset)); diff --git a/src/server.c b/src/server.c index 6ce40c4cf..15863335c 100644 --- a/src/server.c +++ b/src/server.c @@ -3500,6 +3500,11 @@ int processCommand(client *c) { } } + /* We want to block this client if the keys it is going to access + * are locked. */ + if (dictSize(c->db->locked_keys)) { + } + /* Handle the maxmemory directive. * * Note that we do not want to reclaim memory if we are here re-entering diff --git a/src/server.h b/src/server.h index f60ca38d3..f33225bcb 100644 --- a/src/server.h +++ b/src/server.h @@ -265,7 +265,8 @@ typedef long long ustime_t; /* microsecond time type. */ #define BLOCKED_MODULE 3 /* Blocked by a loadable module. */ #define BLOCKED_STREAM 4 /* XREAD. */ #define BLOCKED_ZSET 5 /* BZPOP et al. */ -#define BLOCKED_NUM 6 /* Number of blocked states. */ +#define BLOCKED_LOCK 6 /* Waiting for locked keys. */ +#define BLOCKED_NUM 7 /* Number of blocked states. */ /* Client request types */ #define PROTO_REQ_INLINE 1 -- cgit v1.2.1