From a30678e6f6699b473ffcc0a4525083d6da608466 Mon Sep 17 00:00:00 2001 From: antirez Date: Mon, 1 Jun 2020 13:57:16 +0200 Subject: Threaded core commands: initial lockKey() API. --- src/db.c | 75 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/module.c | 3 +++ src/networking.c | 1 + src/server.c | 1 + src/server.h | 44 ++++++++++++++++++++++++++++++++- 5 files changed, 123 insertions(+), 1 deletion(-) diff --git a/src/db.c b/src/db.c index dc4a0b63e..b7e349adb 100644 --- a/src/db.c +++ b/src/db.c @@ -1749,3 +1749,78 @@ unsigned int delKeysInSlot(unsigned int hashslot) { unsigned int countKeysInSlot(unsigned int hashslot) { return server.cluster->slots_keys_count[hashslot]; } + +/*----------------------------------------------------------------------------- + * Locked keys API + *----------------------------------------------------------------------------*/ + +int lockedClientListMatch(void *a, void *b) { + lockedKeyClient *ca = a, *cb = b; + return ca->id == cb->id; +} + +/* Lock the specified key, in the specified DB and in the context of the + * specified client, and return the locked object by reference. + * The type of the lock can be KEYLOCK_READ or KEYLOCK_WRITE (XXX: not supported + * right now). + * + * Locking the key may fail because the key is already locked or because + * Redis is not able to lock keys in a given moment (because it is waiting + * to fork or for other reasons). When this happens, the command should + * signal to the caller that it requires to be rescheduled, or alternatively + * may implement the operation synchronously without locking any key. + * + * When locking fails C_ERR is returned, otherwise C_OK is returned. + * Note that 'optr' may be populated with NULL if the key that is going + * to be locked is empty. + * + * Signaling of the need to lock keys is performed using the + * wouldLockKey() API (XXX: yet to be implemented). Check the function + * documentation for more information. */ +int lockKey(client *c, redisDb *db, robj *key, int locktype, robj **optr) { + dictEntry *de; + + de = dictFind(db->locked_keys,key); + lockedKey *lk = de ? dictGetVal(de) : NULL; + + if (lk == NULL) { + lk = zmalloc(sizeof(*lk)); + lk->lock_type = locktype; + lk->waiting = listCreate(); + lk->owners = listCreate(); + listSetMatchMethod(lk->waiting,lockedClientListMatch); + listSetMatchMethod(lk->owners,lockedClientListMatch); + listSetFreeMethod(lk->waiting,zfree); + listSetFreeMethod(lk->owners,zfree); + lk->obj = lookupKeyReadWithFlags(db,key,LOOKUP_NOTOUCH); + } else { + /* If there is already a lock, it is incompatible with a new lock + * both in the case the lock is of write type, or we want to lock + * for write. */ + if (lk->lock_type == LOCKEDKEY_WRITE || locktype == LOCKEDKEY_WRITE) + return C_ERR; + } + + /* Lock allowed: put the client in the list of owners. */ + lockedKeyClient *lkc = zmalloc(sizeof(*lkc)); + lkc->id = c->id; + listAddNodeTail(lk->owners,lkc); + + /* We also need to remember that this client locked this key: for + * now we set this information in the real client locking the key, + * however later this information is moved into the blocked client + * handle that we use for threaded execution. The reason is that we + * want to unlock the keys back when the thread has finished, not just + * when this client disconnects. */ + if (c->locked == NULL) + c->locked = dictCreate(&objectKeyPointerValueDictType,NULL); + + de = dictAddRaw(c->locked,key,NULL); + /* Remember the client ID of the lock, so if we move this in the + * blocked handle, we can still track the original ID even if the + * client is gone. */ + dictSetUnsignedIntegerVal(de,c->id); + incrRefCount(key); + *optr = lk->obj; + return C_OK; +} diff --git a/src/module.c b/src/module.c index 2975b6e5e..5b94c7880 100644 --- a/src/module.c +++ b/src/module.c @@ -7227,6 +7227,9 @@ typedef struct { * threaded Redis core command execution. */ void threadedCoreCommandFreePrivdata(RedisModuleCtx *ctx, void *privdata) { UNUSED(ctx); + /* TODO: unlock the key here. This can be as simple as putting the + * locked key in an unlock queue or alike if we don't want to do + * it synchronously here. */ zfree(privdata); CoreModuleBlockedClients--; } diff --git a/src/networking.c b/src/networking.c index 77b9a6fcf..73650998c 100644 --- a/src/networking.c +++ b/src/networking.c @@ -153,6 +153,7 @@ client *createClient(connection *conn) { c->watched_keys = listCreate(); c->pubsub_channels = dictCreate(&objectKeyPointerValueDictType,NULL); c->pubsub_patterns = listCreate(); + c->locked = NULL; c->peerid = NULL; c->client_list_node = NULL; c->client_tracking_redirection = 0; diff --git a/src/server.c b/src/server.c index f41ac3974..6ce40c4cf 100644 --- a/src/server.c +++ b/src/server.c @@ -2820,6 +2820,7 @@ void initServer(void) { server.db[j].blocking_keys = dictCreate(&keylistDictType,NULL); server.db[j].ready_keys = dictCreate(&objectKeyPointerValueDictType,NULL); server.db[j].watched_keys = dictCreate(&keylistDictType,NULL); + server.db[j].locked_keys = dictCreate(&objectKeyPointerValueDictType,NULL); server.db[j].id = j; server.db[j].avg_ttl = 0; server.db[j].defrag_later = listCreate(); diff --git a/src/server.h b/src/server.h index 598fee8b7..f60ca38d3 100644 --- a/src/server.h +++ b/src/server.h @@ -645,14 +645,55 @@ typedef struct redisDb { dict *dict; /* The keyspace for this DB */ dict *expires; /* Timeout of keys with a timeout set */ dict *blocking_keys; /* Keys with clients waiting for data (BLPOP)*/ - dict *ready_keys; /* Blocked keys that received a PUSH */ + dict *ready_keys; /* Blocked keys that received a write */ dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */ + dict *locked_keys; /* Keys locked for threaded operations */ int id; /* Database ID */ long long avg_ttl; /* Average TTL, just for stats */ unsigned long expires_cursor; /* Cursor of the active expire cycle. */ list *defrag_later; /* List of key names to attempt to defrag one by one, gradually. */ } redisDb; +#define LOCKEDKEY_READ 0 /* Key is locked in read mode. The list of + current_owners may contain multiple + clients. */ +#define LOCKEDKEY_WRITE 1 /* Key is locked in write mode. There is only + one client in the list of owners. */ +#define LOCKEDKEY_WAIT 2 /* Key is not locked, but we need to fork or + to perform a full scan operation such as + DEBUG DIGEST, so we are not allowing keys + to be locked. The owner list is empty while + the wait queue is populated by clients that + want to lock the key. */ + +/* This structure is stored at db->locked_keys for each key that has some + * lock or waiting list active. */ +typedef struct lockedKey { + int lock_type; /* LOCKEDKEY_* defines above. */ + int deleted; /* True if the key was deleted in the + main thread while locked: in this case + it is up to us to free the object once + all the owners will return. Moreover when + this happens, clients in the wait queue can + be rescheduled ASAP. */ + robj *obj; /* The locked object itself. May be NULL + if the key was empty at the time we + locked the key. */ + list *owners; /* List of clients that locked this key + and are doing threaded operations. See + the LOCKEDKEY defines comments for more + information. */ + list *waiting; /* Clients that are waiting for the key + to return available in order to be + rescheduled. */ +} lockedKey; + +/* This is the structure we put in the onwers and wait lists of the + * locked keys. */ +typedef struct lockedKeyClient { + uint64_t id; +} lockedKeyClient; + /* Client MULTI/EXEC state */ typedef struct multiCmd { robj **argv; @@ -820,6 +861,7 @@ typedef struct client { list *watched_keys; /* Keys WATCHED for MULTI/EXEC CAS */ dict *pubsub_channels; /* channels a client is interested in (SUBSCRIBE) */ list *pubsub_patterns; /* patterns a client is interested in (SUBSCRIBE) */ + dict *locked; /* Dictionary of keys locked by the client. */ sds peerid; /* Cached peer ID. */ listNode *client_list_node; /* list node in client list */ RedisModuleUserChangedFunc auth_callback; /* Module callback to execute -- cgit v1.2.1