summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorantirez <antirez@gmail.com>2020-06-01 13:57:16 +0200
committerantirez <antirez@gmail.com>2020-06-10 10:40:18 +0200
commita30678e6f6699b473ffcc0a4525083d6da608466 (patch)
treef6628c9a3b244174861fc243ba0e42aab231e40c
parent9c373b2690f30c5cf92e6c52590169d9b26c380a (diff)
downloadredis-a30678e6f6699b473ffcc0a4525083d6da608466.tar.gz
Threaded core commands: initial lockKey() API.
-rw-r--r--src/db.c75
-rw-r--r--src/module.c3
-rw-r--r--src/networking.c1
-rw-r--r--src/server.c1
-rw-r--r--src/server.h44
5 files changed, 123 insertions, 1 deletions
diff --git a/src/db.c b/src/db.c
index dc4a0b63e..b7e349adb 100644
--- a/src/db.c
+++ b/src/db.c
@@ -1749,3 +1749,78 @@ unsigned int delKeysInSlot(unsigned int hashslot) {
unsigned int countKeysInSlot(unsigned int hashslot) {
return server.cluster->slots_keys_count[hashslot];
}
+
+/*-----------------------------------------------------------------------------
+ * Locked keys API
+ *----------------------------------------------------------------------------*/
+
+int lockedClientListMatch(void *a, void *b) {
+ lockedKeyClient *ca = a, *cb = b;
+ return ca->id == cb->id;
+}
+
+/* Lock the specified key, in the specified DB and in the context of the
+ * specified client, and return the locked object by reference.
+ * The type of the lock can be KEYLOCK_READ or KEYLOCK_WRITE (XXX: not supported
+ * right now).
+ *
+ * Locking the key may fail because the key is already locked or because
+ * Redis is not able to lock keys in a given moment (because it is waiting
+ * to fork or for other reasons). When this happens, the command should
+ * signal to the caller that it requires to be rescheduled, or alternatively
+ * may implement the operation synchronously without locking any key.
+ *
+ * When locking fails C_ERR is returned, otherwise C_OK is returned.
+ * Note that 'optr' may be populated with NULL if the key that is going
+ * to be locked is empty.
+ *
+ * Signaling of the need to lock keys is performed using the
+ * wouldLockKey() API (XXX: yet to be implemented). Check the function
+ * documentation for more information. */
+int lockKey(client *c, redisDb *db, robj *key, int locktype, robj **optr) {
+ dictEntry *de;
+
+ de = dictFind(db->locked_keys,key);
+ lockedKey *lk = de ? dictGetVal(de) : NULL;
+
+ if (lk == NULL) {
+ lk = zmalloc(sizeof(*lk));
+ lk->lock_type = locktype;
+ lk->waiting = listCreate();
+ lk->owners = listCreate();
+ listSetMatchMethod(lk->waiting,lockedClientListMatch);
+ listSetMatchMethod(lk->owners,lockedClientListMatch);
+ listSetFreeMethod(lk->waiting,zfree);
+ listSetFreeMethod(lk->owners,zfree);
+ lk->obj = lookupKeyReadWithFlags(db,key,LOOKUP_NOTOUCH);
+ } else {
+ /* If there is already a lock, it is incompatible with a new lock
+ * both in the case the lock is of write type, or we want to lock
+ * for write. */
+ if (lk->lock_type == LOCKEDKEY_WRITE || locktype == LOCKEDKEY_WRITE)
+ return C_ERR;
+ }
+
+ /* Lock allowed: put the client in the list of owners. */
+ lockedKeyClient *lkc = zmalloc(sizeof(*lkc));
+ lkc->id = c->id;
+ listAddNodeTail(lk->owners,lkc);
+
+ /* We also need to remember that this client locked this key: for
+ * now we set this information in the real client locking the key,
+ * however later this information is moved into the blocked client
+ * handle that we use for threaded execution. The reason is that we
+ * want to unlock the keys back when the thread has finished, not just
+ * when this client disconnects. */
+ if (c->locked == NULL)
+ c->locked = dictCreate(&objectKeyPointerValueDictType,NULL);
+
+ de = dictAddRaw(c->locked,key,NULL);
+ /* Remember the client ID of the lock, so if we move this in the
+ * blocked handle, we can still track the original ID even if the
+ * client is gone. */
+ dictSetUnsignedIntegerVal(de,c->id);
+ incrRefCount(key);
+ *optr = lk->obj;
+ return C_OK;
+}
diff --git a/src/module.c b/src/module.c
index 2975b6e5e..5b94c7880 100644
--- a/src/module.c
+++ b/src/module.c
@@ -7227,6 +7227,9 @@ typedef struct {
* threaded Redis core command execution. */
void threadedCoreCommandFreePrivdata(RedisModuleCtx *ctx, void *privdata) {
UNUSED(ctx);
+ /* TODO: unlock the key here. This can be as simple as putting the
+ * locked key in an unlock queue or alike if we don't want to do
+ * it synchronously here. */
zfree(privdata);
CoreModuleBlockedClients--;
}
diff --git a/src/networking.c b/src/networking.c
index 77b9a6fcf..73650998c 100644
--- a/src/networking.c
+++ b/src/networking.c
@@ -153,6 +153,7 @@ client *createClient(connection *conn) {
c->watched_keys = listCreate();
c->pubsub_channels = dictCreate(&objectKeyPointerValueDictType,NULL);
c->pubsub_patterns = listCreate();
+ c->locked = NULL;
c->peerid = NULL;
c->client_list_node = NULL;
c->client_tracking_redirection = 0;
diff --git a/src/server.c b/src/server.c
index f41ac3974..6ce40c4cf 100644
--- a/src/server.c
+++ b/src/server.c
@@ -2820,6 +2820,7 @@ void initServer(void) {
server.db[j].blocking_keys = dictCreate(&keylistDictType,NULL);
server.db[j].ready_keys = dictCreate(&objectKeyPointerValueDictType,NULL);
server.db[j].watched_keys = dictCreate(&keylistDictType,NULL);
+ server.db[j].locked_keys = dictCreate(&objectKeyPointerValueDictType,NULL);
server.db[j].id = j;
server.db[j].avg_ttl = 0;
server.db[j].defrag_later = listCreate();
diff --git a/src/server.h b/src/server.h
index 598fee8b7..f60ca38d3 100644
--- a/src/server.h
+++ b/src/server.h
@@ -645,14 +645,55 @@ typedef struct redisDb {
dict *dict; /* The keyspace for this DB */
dict *expires; /* Timeout of keys with a timeout set */
dict *blocking_keys; /* Keys with clients waiting for data (BLPOP)*/
- dict *ready_keys; /* Blocked keys that received a PUSH */
+ dict *ready_keys; /* Blocked keys that received a write */
dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */
+ dict *locked_keys; /* Keys locked for threaded operations */
int id; /* Database ID */
long long avg_ttl; /* Average TTL, just for stats */
unsigned long expires_cursor; /* Cursor of the active expire cycle. */
list *defrag_later; /* List of key names to attempt to defrag one by one, gradually. */
} redisDb;
+#define LOCKEDKEY_READ 0 /* Key is locked in read mode. The list of
+ current_owners may contain multiple
+ clients. */
+#define LOCKEDKEY_WRITE 1 /* Key is locked in write mode. There is only
+ one client in the list of owners. */
+#define LOCKEDKEY_WAIT 2 /* Key is not locked, but we need to fork or
+ to perform a full scan operation such as
+ DEBUG DIGEST, so we are not allowing keys
+ to be locked. The owner list is empty while
+ the wait queue is populated by clients that
+ want to lock the key. */
+
+/* This structure is stored at db->locked_keys for each key that has some
+ * lock or waiting list active. */
+typedef struct lockedKey {
+ int lock_type; /* LOCKEDKEY_* defines above. */
+ int deleted; /* True if the key was deleted in the
+ main thread while locked: in this case
+ it is up to us to free the object once
+ all the owners will return. Moreover when
+ this happens, clients in the wait queue can
+ be rescheduled ASAP. */
+ robj *obj; /* The locked object itself. May be NULL
+ if the key was empty at the time we
+ locked the key. */
+ list *owners; /* List of clients that locked this key
+ and are doing threaded operations. See
+ the LOCKEDKEY defines comments for more
+ information. */
+ list *waiting; /* Clients that are waiting for the key
+ to return available in order to be
+ rescheduled. */
+} lockedKey;
+
+/* This is the structure we put in the onwers and wait lists of the
+ * locked keys. */
+typedef struct lockedKeyClient {
+ uint64_t id;
+} lockedKeyClient;
+
/* Client MULTI/EXEC state */
typedef struct multiCmd {
robj **argv;
@@ -820,6 +861,7 @@ typedef struct client {
list *watched_keys; /* Keys WATCHED for MULTI/EXEC CAS */
dict *pubsub_channels; /* channels a client is interested in (SUBSCRIBE) */
list *pubsub_patterns; /* patterns a client is interested in (SUBSCRIBE) */
+ dict *locked; /* Dictionary of keys locked by the client. */
sds peerid; /* Cached peer ID. */
listNode *client_list_node; /* list node in client list */
RedisModuleUserChangedFunc auth_callback; /* Module callback to execute