summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorantirez <antirez@gmail.com>2020-06-01 16:40:32 +0200
committerantirez <antirez@gmail.com>2020-06-10 10:40:18 +0200
commitdd53453e8fa7b8c2fc2821488b8692bea30a8f60 (patch)
tree227ddfdf326b32dc509bff2382852d64481a95a6
parent52db745c0daadf9943d006781fb1cd04ca03c0b9 (diff)
downloadredis-dd53453e8fa7b8c2fc2821488b8692bea30a8f60.tar.gz
Threaded core commands: clients wait queue WIP.
-rw-r--r--src/blocked.c21
-rw-r--r--src/server.c5
-rw-r--r--src/server.h3
3 files changed, 21 insertions, 8 deletions
diff --git a/src/blocked.c b/src/blocked.c
index 92f1cee65..a14330b5c 100644
--- a/src/blocked.c
+++ b/src/blocked.c
@@ -144,13 +144,15 @@ void queueClientForReprocessing(client *c) {
/* Unblock a client calling the right function depending on the kind
* of operation the client is blocking for. */
void unblockClient(client *c) {
- if (c->btype == BLOCKED_LIST ||
- c->btype == BLOCKED_ZSET ||
- c->btype == BLOCKED_STREAM) {
+ int btype = c->btype;
+
+ if (btype == BLOCKED_LIST ||
+ btype == BLOCKED_ZSET ||
+ btype == BLOCKED_STREAM) {
unblockClientWaitingData(c);
- } else if (c->btype == BLOCKED_WAIT) {
+ } else if (btype == BLOCKED_WAIT) {
unblockClientWaitingReplicas(c);
- } else if (c->btype == BLOCKED_MODULE) {
+ } else if (btype == BLOCKED_MODULE) {
if (moduleClientIsBlockedOnKeys(c)) unblockClientWaitingData(c);
unblockClientFromModule(c);
} else {
@@ -159,11 +161,15 @@ void unblockClient(client *c) {
/* Clear the flags, and put the client in the unblocked list so that
* we'll process new commands in its query buffer ASAP. */
server.blocked_clients--;
- server.blocked_clients_by_type[c->btype]--;
+ server.blocked_clients_by_type[btype]--;
c->flags &= ~CLIENT_BLOCKED;
c->btype = BLOCKED_NONE;
removeClientFromTimeoutTable(c);
queueClientForReprocessing(c);
+
+ /* Re-execute the command if it was not executed at all since the
+ * client was blocked because of key locks. */
+ if (btype == BLOCKED_LOCK) processCommand(c);
}
/* This function gets called when a blocked client timed out in order to
@@ -172,7 +178,8 @@ void unblockClient(client *c) {
void replyToBlockedClientTimedOut(client *c) {
if (c->btype == BLOCKED_LIST ||
c->btype == BLOCKED_ZSET ||
- c->btype == BLOCKED_STREAM) {
+ c->btype == BLOCKED_STREAM ||
+ c->btype == BLOCKED_LOCK) {
addReplyNullArray(c);
} else if (c->btype == BLOCKED_WAIT) {
addReplyLongLong(c,replicationCountAcksByOffset(c->bpop.reploffset));
diff --git a/src/server.c b/src/server.c
index 6ce40c4cf..15863335c 100644
--- a/src/server.c
+++ b/src/server.c
@@ -3500,6 +3500,11 @@ int processCommand(client *c) {
}
}
+ /* We want to block this client if the keys it is going to access
+ * are locked. */
+ if (dictSize(c->db->locked_keys)) {
+ }
+
/* Handle the maxmemory directive.
*
* Note that we do not want to reclaim memory if we are here re-entering
diff --git a/src/server.h b/src/server.h
index f60ca38d3..f33225bcb 100644
--- a/src/server.h
+++ b/src/server.h
@@ -265,7 +265,8 @@ typedef long long ustime_t; /* microsecond time type. */
#define BLOCKED_MODULE 3 /* Blocked by a loadable module. */
#define BLOCKED_STREAM 4 /* XREAD. */
#define BLOCKED_ZSET 5 /* BZPOP et al. */
-#define BLOCKED_NUM 6 /* Number of blocked states. */
+#define BLOCKED_LOCK 6 /* Waiting for locked keys. */
+#define BLOCKED_NUM 7 /* Number of blocked states. */
/* Client request types */
#define PROTO_REQ_INLINE 1