summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorantirez <antirez@gmail.com>2020-06-03 09:44:57 +0200
committerantirez <antirez@gmail.com>2020-06-10 10:40:18 +0200
commit426b864345ea3683845cfa082dea83235394e177 (patch)
tree390f09ad7267c19fc2ac7fe127a3e60e22d714c6
parentdd53453e8fa7b8c2fc2821488b8692bea30a8f60 (diff)
downloadredis-426b864345ea3683845cfa082dea83235394e177.tar.gz
Threaded core commands: block clients on locked keys.
-rw-r--r--src/blocked.c2
-rw-r--r--src/db.c13
-rw-r--r--src/networking.c7
-rw-r--r--src/server.c23
-rw-r--r--src/server.h2
5 files changed, 42 insertions, 5 deletions
diff --git a/src/blocked.c b/src/blocked.c
index a14330b5c..574b71d77 100644
--- a/src/blocked.c
+++ b/src/blocked.c
@@ -179,7 +179,7 @@ void replyToBlockedClientTimedOut(client *c) {
if (c->btype == BLOCKED_LIST ||
c->btype == BLOCKED_ZSET ||
c->btype == BLOCKED_STREAM ||
- c->btype == BLOCKED_LOCK) {
+ c->btype == BLOCKED_LOCK) { /* type LOCK never timeouts actually. */
addReplyNullArray(c);
} else if (c->btype == BLOCKED_WAIT) {
addReplyLongLong(c,replicationCountAcksByOffset(c->bpop.reploffset));
diff --git a/src/db.c b/src/db.c
index b7e349adb..179018cd5 100644
--- a/src/db.c
+++ b/src/db.c
@@ -1824,3 +1824,16 @@ int lockKey(client *c, redisDb *db, robj *key, int locktype, robj **optr) {
*optr = lk->obj;
return C_OK;
}
+
+/* If the key this client is trying to access is locked, put it into
+ * its waiting list and return C_OK, otherwise return C_ERR. */
+int queueClientIfKeyIsLocked(client *c, robj *key) {
+ lockedKey *lk = dictFetchValue(c->db->locked_keys,key);
+ if (lk == NULL) return C_ERR;
+
+ lockedKeyClient *lkc = zmalloc(sizeof(*lkc));
+ lkc->id = c->id;
+ listAddNodeTail(lk->waiting,lkc);
+ return C_OK;
+}
+
diff --git a/src/networking.c b/src/networking.c
index 73650998c..7a4d9602e 100644
--- a/src/networking.c
+++ b/src/networking.c
@@ -1749,9 +1749,12 @@ void commandProcessed(client *c) {
/* Don't reset the client structure for clients blocked in a
* module blocking command, so that the reply callback will
* still be able to access the client argv and argc field.
- * The client will be reset in unblockClientFromModule(). */
+ * The client will be reset in unblockClientFromModule().
+ * Don't reset the client for clients waiting to be re-executed
+ * with the current command line because they are waiting for
+ * locked keys. */
if (!(c->flags & CLIENT_BLOCKED) ||
- c->btype != BLOCKED_MODULE)
+ (c->btype != BLOCKED_MODULE && c->btype != BLOCKED_LOCK))
{
resetClient(c);
}
diff --git a/src/server.c b/src/server.c
index 15863335c..ce9397852 100644
--- a/src/server.c
+++ b/src/server.c
@@ -3501,8 +3501,27 @@ int processCommand(client *c) {
}
/* We want to block this client if the keys it is going to access
- * are locked. */
- if (dictSize(c->db->locked_keys)) {
+ * are locked, and the operation the client is going to perform
+ * is not read-only. For now we only block clients in the case of
+ * write operations since we are just supporting read locks so far.
+ * When keys will be also locked in write mode, read only operations
+ * will be queued as well. */
+ if (c->cmd->flags & CMD_WRITE && dictSize(c->db->locked_keys)) {
+ int locked = 0;
+ int numkeys;
+ int *keyidx = getKeysFromCommand(c->cmd,c->argv,c->argc,&numkeys);
+ for (int j = 0; j < numkeys; j++) {
+ if (queueClientIfKeyIsLocked(c,c->argv[keyidx[j]]) == C_OK)
+ locked++;
+ }
+ getKeysFreeResult(keyidx);
+
+ /* Lock the client and return if we are waiting for at least one
+ * key. */
+ if (locked) {
+ blockClient(c,BLOCKED_LOCK);
+ return C_OK;
+ }
}
/* Handle the maxmemory directive.
diff --git a/src/server.h b/src/server.h
index f33225bcb..65695fbf4 100644
--- a/src/server.h
+++ b/src/server.h
@@ -1628,6 +1628,8 @@ void moduleNotifyUserChanged(client *c);
typedef void (*coreThreadedCommandCallback)(client *c, void *options);
void executeThreadedCommand(client *c, coreThreadedCommandCallback callback, void *options);
unsigned long runningThreadedCommandsCount(void);
+int lockKey(client *c, redisDb *db, robj *key, int locktype, robj **optr);
+int queueClientIfKeyIsLocked(client *c, robj *key);
/* Utils */
long long ustime(void);