diff options
author | antirez <antirez@gmail.com> | 2020-06-03 09:44:57 +0200 |
---|---|---|
committer | antirez <antirez@gmail.com> | 2020-06-10 10:40:18 +0200 |
commit | 426b864345ea3683845cfa082dea83235394e177 (patch) | |
tree | 390f09ad7267c19fc2ac7fe127a3e60e22d714c6 | |
parent | dd53453e8fa7b8c2fc2821488b8692bea30a8f60 (diff) | |
download | redis-426b864345ea3683845cfa082dea83235394e177.tar.gz |
Threaded core commands: block clients on locked keys.
-rw-r--r-- | src/blocked.c | 2 | ||||
-rw-r--r-- | src/db.c | 13 | ||||
-rw-r--r-- | src/networking.c | 7 | ||||
-rw-r--r-- | src/server.c | 23 | ||||
-rw-r--r-- | src/server.h | 2 |
5 files changed, 42 insertions, 5 deletions
diff --git a/src/blocked.c b/src/blocked.c index a14330b5c..574b71d77 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -179,7 +179,7 @@ void replyToBlockedClientTimedOut(client *c) { if (c->btype == BLOCKED_LIST || c->btype == BLOCKED_ZSET || c->btype == BLOCKED_STREAM || - c->btype == BLOCKED_LOCK) { + c->btype == BLOCKED_LOCK) { /* type LOCK never timeouts actually. */ addReplyNullArray(c); } else if (c->btype == BLOCKED_WAIT) { addReplyLongLong(c,replicationCountAcksByOffset(c->bpop.reploffset)); @@ -1824,3 +1824,16 @@ int lockKey(client *c, redisDb *db, robj *key, int locktype, robj **optr) { *optr = lk->obj; return C_OK; } + +/* If the key this client is trying to access is locked, put it into + * its waiting list and return C_OK, otherwise return C_ERR. */ +int queueClientIfKeyIsLocked(client *c, robj *key) { + lockedKey *lk = dictFetchValue(c->db->locked_keys,key); + if (lk == NULL) return C_ERR; + + lockedKeyClient *lkc = zmalloc(sizeof(*lkc)); + lkc->id = c->id; + listAddNodeTail(lk->waiting,lkc); + return C_OK; +} + diff --git a/src/networking.c b/src/networking.c index 73650998c..7a4d9602e 100644 --- a/src/networking.c +++ b/src/networking.c @@ -1749,9 +1749,12 @@ void commandProcessed(client *c) { /* Don't reset the client structure for clients blocked in a * module blocking command, so that the reply callback will * still be able to access the client argv and argc field. - * The client will be reset in unblockClientFromModule(). */ + * The client will be reset in unblockClientFromModule(). + * Don't reset the client for clients waiting to be re-executed + * with the current command line because they are waiting for + * locked keys. */ if (!(c->flags & CLIENT_BLOCKED) || - c->btype != BLOCKED_MODULE) + (c->btype != BLOCKED_MODULE && c->btype != BLOCKED_LOCK)) { resetClient(c); } diff --git a/src/server.c b/src/server.c index 15863335c..ce9397852 100644 --- a/src/server.c +++ b/src/server.c @@ -3501,8 +3501,27 @@ int processCommand(client *c) { } /* We want to block this client if the keys it is going to access - * are locked. */ - if (dictSize(c->db->locked_keys)) { + * are locked, and the operation the client is going to perform + * is not read-only. For now we only block clients in the case of + * write operations since we are just supporting read locks so far. + * When keys will be also locked in write mode, read only operations + * will be queued as well. */ + if (c->cmd->flags & CMD_WRITE && dictSize(c->db->locked_keys)) { + int locked = 0; + int numkeys; + int *keyidx = getKeysFromCommand(c->cmd,c->argv,c->argc,&numkeys); + for (int j = 0; j < numkeys; j++) { + if (queueClientIfKeyIsLocked(c,c->argv[keyidx[j]]) == C_OK) + locked++; + } + getKeysFreeResult(keyidx); + + /* Lock the client and return if we are waiting for at least one + * key. */ + if (locked) { + blockClient(c,BLOCKED_LOCK); + return C_OK; + } } /* Handle the maxmemory directive. diff --git a/src/server.h b/src/server.h index f33225bcb..65695fbf4 100644 --- a/src/server.h +++ b/src/server.h @@ -1628,6 +1628,8 @@ void moduleNotifyUserChanged(client *c); typedef void (*coreThreadedCommandCallback)(client *c, void *options); void executeThreadedCommand(client *c, coreThreadedCommandCallback callback, void *options); unsigned long runningThreadedCommandsCount(void); +int lockKey(client *c, redisDb *db, robj *key, int locktype, robj **optr); +int queueClientIfKeyIsLocked(client *c, robj *key); /* Utils */ long long ustime(void); |