diff options
author | Viktor Söderqvist <viktor.soderqvist@est.tech> | 2021-08-31 08:25:36 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-08-30 23:25:36 -0700 |
commit | f24c63a292e045d4b14b82b25981f00a95c1767a (patch) | |
tree | 8d378dd0e6eb9fc016840e3652236b34fa9a9b3f | |
parent | 1e7ad894d2ecb9ad73d68ff1f1102195582b1469 (diff) | |
download | redis-f24c63a292e045d4b14b82b25981f00a95c1767a.tar.gz |
Slot-to-keys using dict entry metadata (#9356)
* Enhance dict to support arbitrary metadata carried in dictEntry
Co-authored-by: Viktor Söderqvist <viktor.soderqvist@est.tech>
* Rewrite slot-to-keys mapping to linked lists using dict entry metadata
This is a memory enhancement for Redis Cluster.
The radix tree slots_to_keys (which duplicates all key names prefixed with their
slot number) is replaced with a linked list for each slot. The dict entries of
the same cluster slot form a linked list and the pointers are stored as metadata
in each dict entry of the main DB dict.
This commit also moves the slot-to-key API from db.c to cluster.c.
Co-authored-by: Jim Brunner <brunnerj@amazon.com>
-rw-r--r-- | src/cluster.c | 131 | ||||
-rw-r--r-- | src/cluster.h | 26 | ||||
-rw-r--r-- | src/db.c | 140 | ||||
-rw-r--r-- | src/defrag.c | 10 | ||||
-rw-r--r-- | src/dict.c | 12 | ||||
-rw-r--r-- | src/dict.h | 14 | ||||
-rw-r--r-- | src/lazyfree.c | 24 | ||||
-rw-r--r-- | src/server.c | 11 | ||||
-rw-r--r-- | src/server.h | 8 |
9 files changed, 191 insertions, 185 deletions
diff --git a/src/cluster.c b/src/cluster.c index c7e1e66c5..9f5e9e0b3 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -77,6 +77,15 @@ uint64_t clusterGetMaxEpoch(void); int clusterBumpConfigEpochWithoutConsensus(void); void moduleCallClusterReceivers(const char *sender_id, uint64_t module_id, uint8_t type, const unsigned char *payload, uint32_t len); const char *clusterGetMessageTypeString(int type); +unsigned int countKeysInSlot(unsigned int hashslot); +unsigned int delKeysInSlot(unsigned int hashslot); + +/* Links to the next and previous entries for keys in the same slot are stored + * in the dict entry metadata. See Slot to Key API below. */ +#define dictEntryNextInSlot(de) \ + (((clusterDictEntryMetadata *)dictMetadata(de))->next) +#define dictEntryPrevInSlot(de) \ + (((clusterDictEntryMetadata *)dictMetadata(de))->prev) #define RCVBUF_INIT_LEN 1024 #define RCVBUF_MAX_PREALLOC (1<<20) /* 1MB */ @@ -572,10 +581,8 @@ void clusterInit(void) { serverPanic("Unrecoverable error creating Redis Cluster socket accept handler."); } - /* The slots -> keys map is a radix tree. Initialize it here. */ - server.cluster->slots_to_keys = raxNew(); - memset(server.cluster->slots_keys_count,0, - sizeof(server.cluster->slots_keys_count)); + /* Reset data for the Slot to key API. */ + slotToKeyFlush(); /* Set myself->port/cport/pport to my listening ports, we'll just need to * discover the IP address via MEET messages. */ @@ -4844,8 +4851,6 @@ NULL } else if (!strcasecmp(c->argv[1]->ptr,"getkeysinslot") && c->argc == 4) { /* CLUSTER GETKEYSINSLOT <slot> <count> */ long long maxkeys, slot; - unsigned int numkeys, j; - robj **keys; if (getLongLongFromObjectOrReply(c,c->argv[2],&slot,NULL) != C_OK) return; @@ -4857,19 +4862,16 @@ NULL return; } - /* Avoid allocating more than needed in case of large COUNT argument - * and smaller actual number of keys. */ unsigned int keys_in_slot = countKeysInSlot(slot); - if (maxkeys > keys_in_slot) maxkeys = keys_in_slot; - - keys = zmalloc(sizeof(robj*)*maxkeys); - numkeys = getKeysInSlot(slot, keys, maxkeys); + unsigned int numkeys = maxkeys > keys_in_slot ? keys_in_slot : maxkeys; addReplyArrayLen(c,numkeys); - for (j = 0; j < numkeys; j++) { - addReplyBulk(c,keys[j]); - decrRefCount(keys[j]); + dictEntry *de = server.cluster->slots_to_keys[slot].head; + for (unsigned int j = 0; j < numkeys; j++) { + serverAssert(de != NULL); + sds sdskey = dictGetKey(de); + addReplyBulkCBuffer(c, sdskey, sdslen(sdskey)); + de = dictEntryNextInSlot(de); } - zfree(keys); } else if (!strcasecmp(c->argv[1]->ptr,"forget") && c->argc == 3) { /* CLUSTER FORGET <NODE ID> */ clusterNode *n = clusterLookupNode(c->argv[2]->ptr); @@ -6099,3 +6101,100 @@ int clusterRedirectBlockedClientIfNeeded(client *c) { } return 0; } + +/* Slot to Key API. This is used by Redis Cluster in order to obtain in + * a fast way a key that belongs to a specified hash slot. This is useful + * while rehashing the cluster and in other conditions when we need to + * understand if we have keys for a given hash slot. */ + +void slotToKeyAddEntry(dictEntry *entry) { + sds key = entry->key; + unsigned int hashslot = keyHashSlot(key, sdslen(key)); + server.cluster->slots_to_keys[hashslot].count++; + + /* Insert entry before the first element in the list. */ + dictEntry *first = server.cluster->slots_to_keys[hashslot].head; + dictEntryNextInSlot(entry) = first; + if (first != NULL) { + serverAssert(dictEntryPrevInSlot(first) == NULL); + dictEntryPrevInSlot(first) = entry; + } + serverAssert(dictEntryPrevInSlot(entry) == NULL); + server.cluster->slots_to_keys[hashslot].head = entry; +} + +void slotToKeyDelEntry(dictEntry *entry) { + sds key = entry->key; + unsigned int hashslot = keyHashSlot(key, sdslen(key)); + server.cluster->slots_to_keys[hashslot].count--; + + /* Connect previous and next entries to each other. */ + dictEntry *next = dictEntryNextInSlot(entry); + dictEntry *prev = dictEntryPrevInSlot(entry); + if (next != NULL) { + dictEntryPrevInSlot(next) = prev; + } + if (prev != NULL) { + dictEntryNextInSlot(prev) = next; + } else { + /* The removed entry was the first in the list. */ + serverAssert(server.cluster->slots_to_keys[hashslot].head == entry); + server.cluster->slots_to_keys[hashslot].head = next; + } +} + +/* Updates neighbour entries when an entry has been replaced (e.g. reallocated + * during active defrag). */ +void slotToKeyReplaceEntry(dictEntry *entry) { + dictEntry *next = dictEntryNextInSlot(entry); + dictEntry *prev = dictEntryPrevInSlot(entry); + if (next != NULL) { + dictEntryPrevInSlot(next) = entry; + } + if (prev != NULL) { + dictEntryNextInSlot(prev) = entry; + } else { + /* The replaced entry was the first in the list. */ + sds key = entry->key; + unsigned int hashslot = keyHashSlot(key, sdslen(key)); + server.cluster->slots_to_keys[hashslot].head = entry; + } +} + +/* Copies the slots-keys map to the specified backup structure. */ +void slotToKeyCopyToBackup(clusterSlotsToKeysData *backup) { + memcpy(backup, server.cluster->slots_to_keys, + sizeof(server.cluster->slots_to_keys)); +} + +/* Overwrites the slots-keys map by copying the provided backup structure. */ +void slotToKeyRestoreBackup(clusterSlotsToKeysData *backup) { + memcpy(server.cluster->slots_to_keys, backup, + sizeof(server.cluster->slots_to_keys)); +} + +/* Empty the slots-keys map of Redis Cluster. */ +void slotToKeyFlush(void) { + memset(&server.cluster->slots_to_keys, 0, + sizeof(server.cluster->slots_to_keys)); +} + +/* Remove all the keys in the specified hash slot. + * The number of removed items is returned. */ +unsigned int delKeysInSlot(unsigned int hashslot) { + unsigned int j = 0; + dictEntry *de = server.cluster->slots_to_keys[hashslot].head; + while (de != NULL) { + sds sdskey = dictGetKey(de); + de = dictEntryNextInSlot(de); + robj *key = createStringObject(sdskey, sdslen(sdskey)); + dbDelete(&server.db[0], key); + decrRefCount(key); + j++; + } + return j; +} + +unsigned int countKeysInSlot(unsigned int hashslot) { + return server.cluster->slots_to_keys[hashslot].count; +} diff --git a/src/cluster.h b/src/cluster.h index 890cd788a..54a1ac25a 100644 --- a/src/cluster.h +++ b/src/cluster.h @@ -141,6 +141,23 @@ typedef struct clusterNode { list *fail_reports; /* List of nodes signaling this as failing */ } clusterNode; +/* State for the Slot to Key API, for a single slot. The keys in the same slot + * are linked together using dictEntry metadata. See also "Slot to Key API" in + * cluster.c. */ +struct clusterSlotToKeys { + uint64_t count; /* Number of keys in the slot. */ + dictEntry *head; /* The first key-value entry in the slot. */ +}; +typedef struct clusterSlotToKeys clusterSlotsToKeysData[CLUSTER_SLOTS]; + +/* Dict entry metadata for cluster mode, used for the Slot to Key API to form a + * linked list of the entries belonging to the same slot. */ +typedef struct clusterDictEntryMetadata { + dictEntry *prev; /* Prev entry with key in the same slot */ + dictEntry *next; /* Next entry with key in the same slot */ +} clusterDictEntryMetadata; + + typedef struct clusterState { clusterNode *myself; /* This node */ uint64_t currentEpoch; @@ -151,8 +168,7 @@ typedef struct clusterState { clusterNode *migrating_slots_to[CLUSTER_SLOTS]; clusterNode *importing_slots_from[CLUSTER_SLOTS]; clusterNode *slots[CLUSTER_SLOTS]; - uint64_t slots_keys_count[CLUSTER_SLOTS]; - rax *slots_to_keys; + clusterSlotsToKeysData slots_to_keys; /* The following fields are used to take the slave state on elections. */ mstime_t failover_auth_time; /* Time of previous or next election. */ int failover_auth_count; /* Number of votes received so far. */ @@ -299,5 +315,11 @@ unsigned long getClusterConnectionsCount(void); int clusterSendModuleMessageToTarget(const char *target, uint64_t module_id, uint8_t type, unsigned char *payload, uint32_t len); void clusterPropagatePublish(robj *channel, robj *message); unsigned int keyHashSlot(char *key, int keylen); +void slotToKeyAddEntry(dictEntry *entry); +void slotToKeyDelEntry(dictEntry *entry); +void slotToKeyReplaceEntry(dictEntry *entry); +void slotToKeyCopyToBackup(clusterSlotsToKeysData *backup); +void slotToKeyRestoreBackup(clusterSlotsToKeysData *backup); +void slotToKeyFlush(void); #endif /* __CLUSTER_H */ @@ -38,8 +38,7 @@ /* Database backup. */ struct dbBackup { redisDb *dbarray; - rax *slots_to_keys; - uint64_t slots_keys_count[CLUSTER_SLOTS]; + clusterSlotsToKeysData slots_to_keys; }; /*----------------------------------------------------------------------------- @@ -184,11 +183,11 @@ robj *lookupKeyWriteOrReply(client *c, robj *key, robj *reply) { * The program is aborted if the key already exists. */ void dbAdd(redisDb *db, robj *key, robj *val) { sds copy = sdsdup(key->ptr); - int retval = dictAdd(db->dict, copy, val); - - serverAssertWithInfo(NULL,key,retval == DICT_OK); + dictEntry *de = dictAddRaw(db->dict, copy, NULL); + serverAssertWithInfo(NULL, key, de != NULL); + dictSetVal(db->dict, de, val); signalKeyAsReady(db, key, val->type); - if (server.cluster_enabled) slotToKeyAdd(key->ptr); + if (server.cluster_enabled) slotToKeyAddEntry(de); } /* This is a special version of dbAdd() that is used only when loading @@ -203,9 +202,10 @@ void dbAdd(redisDb *db, robj *key, robj *val) { * ownership of the SDS string, otherwise 0 is returned, and is up to the * caller to free the SDS string. */ int dbAddRDBLoad(redisDb *db, sds key, robj *val) { - int retval = dictAdd(db->dict, key, val); - if (retval != DICT_OK) return 0; - if (server.cluster_enabled) slotToKeyAdd(key); + dictEntry *de = dictAddRaw(db->dict, key, NULL); + if (de == NULL) return 0; + dictSetVal(db->dict, de, val); + if (server.cluster_enabled) slotToKeyAddEntry(de); return 1; } @@ -313,8 +313,8 @@ int dbSyncDelete(redisDb *db, robj *key) { robj *val = dictGetVal(de); /* Tells the module that the key has been unlinked from the database. */ moduleNotifyKeyUnlink(key,val,db->id); + if (server.cluster_enabled) slotToKeyDelEntry(de); dictFreeUnlinkedEntry(db->dict,de); - if (server.cluster_enabled) slotToKeyDel(key->ptr); return 1; } else { return 0; @@ -441,7 +441,7 @@ long long emptyDb(int dbnum, int flags, void(callback)(dict*)) { /* Flush slots to keys map if enable cluster, we can flush entire * slots to keys map whatever dbnum because only support one DB * in cluster mode. */ - if (server.cluster_enabled) slotToKeyFlush(async); + if (server.cluster_enabled) slotToKeyFlush(); if (dbnum == -1) flushSlaveKeysWithExpireList(); @@ -469,12 +469,8 @@ dbBackup *backupDb(void) { /* Backup cluster slots to keys map if enable cluster. */ if (server.cluster_enabled) { - backup->slots_to_keys = server.cluster->slots_to_keys; - memcpy(backup->slots_keys_count, server.cluster->slots_keys_count, - sizeof(server.cluster->slots_keys_count)); - server.cluster->slots_to_keys = raxNew(); - memset(server.cluster->slots_keys_count, 0, - sizeof(server.cluster->slots_keys_count)); + slotToKeyCopyToBackup(&backup->slots_to_keys); + slotToKeyFlush(); } moduleFireServerEvent(REDISMODULE_EVENT_REPL_BACKUP, @@ -496,9 +492,6 @@ void discardDbBackup(dbBackup *backup, int flags, void(callback)(dict*)) { dictRelease(backup->dbarray[i].expires); } - /* Release slots to keys map backup if enable cluster. */ - if (server.cluster_enabled) freeSlotsToKeysMap(backup->slots_to_keys, async); - /* Release backup. */ zfree(backup->dbarray); zfree(backup); @@ -523,13 +516,7 @@ void restoreDbBackup(dbBackup *backup) { } /* Restore slots to keys map backup if enable cluster. */ - if (server.cluster_enabled) { - serverAssert(server.cluster->slots_to_keys->numele == 0); - raxFree(server.cluster->slots_to_keys); - server.cluster->slots_to_keys = backup->slots_to_keys; - memcpy(server.cluster->slots_keys_count, backup->slots_keys_count, - sizeof(server.cluster->slots_keys_count)); - } + if (server.cluster_enabled) slotToKeyRestoreBackup(&backup->slots_to_keys); /* Release backup. */ zfree(backup->dbarray); @@ -1913,102 +1900,3 @@ int xreadGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult result->numkeys = num; return num; } - -/* Slot to Key API. This is used by Redis Cluster in order to obtain in - * a fast way a key that belongs to a specified hash slot. This is useful - * while rehashing the cluster and in other conditions when we need to - * understand if we have keys for a given hash slot. */ -void slotToKeyUpdateKey(sds key, int add) { - size_t keylen = sdslen(key); - unsigned int hashslot = keyHashSlot(key,keylen); - unsigned char buf[64]; - unsigned char *indexed = buf; - - server.cluster->slots_keys_count[hashslot] += add ? 1 : -1; - if (keylen+2 > 64) indexed = zmalloc(keylen+2); - indexed[0] = (hashslot >> 8) & 0xff; - indexed[1] = hashslot & 0xff; - memcpy(indexed+2,key,keylen); - if (add) { - raxInsert(server.cluster->slots_to_keys,indexed,keylen+2,NULL,NULL); - } else { - raxRemove(server.cluster->slots_to_keys,indexed,keylen+2,NULL); - } - if (indexed != buf) zfree(indexed); -} - -void slotToKeyAdd(sds key) { - slotToKeyUpdateKey(key,1); -} - -void slotToKeyDel(sds key) { - slotToKeyUpdateKey(key,0); -} - -/* Release the radix tree mapping Redis Cluster keys to slots. If 'async' - * is true, we release it asynchronously. */ -void freeSlotsToKeysMap(rax *rt, int async) { - if (async) { - freeSlotsToKeysMapAsync(rt); - } else { - raxFree(rt); - } -} - -/* Empty the slots-keys map of Redis CLuster by creating a new empty one and - * freeing the old one. */ -void slotToKeyFlush(int async) { - rax *old = server.cluster->slots_to_keys; - - server.cluster->slots_to_keys = raxNew(); - memset(server.cluster->slots_keys_count,0, - sizeof(server.cluster->slots_keys_count)); - freeSlotsToKeysMap(old, async); -} - -/* Populate the specified array of objects with keys in the specified slot. - * New objects are returned to represent keys, it's up to the caller to - * decrement the reference count to release the keys names. */ -unsigned int getKeysInSlot(unsigned int hashslot, robj **keys, unsigned int count) { - raxIterator iter; - int j = 0; - unsigned char indexed[2]; - - indexed[0] = (hashslot >> 8) & 0xff; - indexed[1] = hashslot & 0xff; - raxStart(&iter,server.cluster->slots_to_keys); - raxSeek(&iter,">=",indexed,2); - while(count-- && raxNext(&iter)) { - if (iter.key[0] != indexed[0] || iter.key[1] != indexed[1]) break; - keys[j++] = createStringObject((char*)iter.key+2,iter.key_len-2); - } - raxStop(&iter); - return j; -} - -/* Remove all the keys in the specified hash slot. - * The number of removed items is returned. */ -unsigned int delKeysInSlot(unsigned int hashslot) { - raxIterator iter; - int j = 0; - unsigned char indexed[2]; - - indexed[0] = (hashslot >> 8) & 0xff; - indexed[1] = hashslot & 0xff; - raxStart(&iter,server.cluster->slots_to_keys); - while(server.cluster->slots_keys_count[hashslot]) { - raxSeek(&iter,">=",indexed,2); - raxNext(&iter); - - robj *key = createStringObject((char*)iter.key+2,iter.key_len-2); - dbDelete(&server.db[0],key); - decrRefCount(key); - j++; - } - raxStop(&iter); - return j; -} - -unsigned int countKeysInSlot(unsigned int hashslot) { - return server.cluster->slots_keys_count[hashslot]; -} diff --git a/src/defrag.c b/src/defrag.c index c804af370..f5d56078d 100644 --- a/src/defrag.c +++ b/src/defrag.c @@ -34,6 +34,7 @@ */ #include "server.h" +#include "cluster.h" #include <time.h> #include <assert.h> #include <stddef.h> @@ -45,7 +46,7 @@ int je_get_defrag_hint(void* ptr); /* forward declarations*/ -void defragDictBucketCallback(void *privdata, dictEntry **bucketref); +void defragDictBucketCallback(dict *d, dictEntry **bucketref); dictEntry* replaceSatelliteDictKeyPtrAndOrDefragDictEntry(dict *d, sds oldkey, sds newkey, uint64_t hash, long *defragged); /* Defrag helper for generic allocations. @@ -895,12 +896,15 @@ void defragScanCallback(void *privdata, const dictEntry *de) { /* Defrag scan callback for each hash table bucket, * used in order to defrag the dictEntry allocations. */ -void defragDictBucketCallback(void *privdata, dictEntry **bucketref) { - UNUSED(privdata); /* NOTE: this function is also used by both activeDefragCycle and scanLaterHash, etc. don't use privdata */ +void defragDictBucketCallback(dict *d, dictEntry **bucketref) { while(*bucketref) { dictEntry *de = *bucketref, *newde; if ((newde = activeDefragAlloc(de))) { *bucketref = newde; + if (server.cluster_enabled && d == server.db[0].dict) { + /* Cluster keyspace dict. Update slot-to-entries mapping. */ + slotToKeyReplaceEntry(newde); + } } bucketref = &(*bucketref)->next; } diff --git a/src/dict.c b/src/dict.c index a26ba9eba..9b08baaf1 100644 --- a/src/dict.c +++ b/src/dict.c @@ -338,7 +338,11 @@ dictEntry *dictAddRaw(dict *d, void *key, dictEntry **existing) * system it is more likely that recently added entries are accessed * more frequently. */ htidx = dictIsRehashing(d) ? 1 : 0; - entry = zmalloc(sizeof(*entry)); + size_t metasize = dictMetadataSize(d); + entry = zmalloc(sizeof(*entry) + metasize); + if (metasize > 0) { + memset(dictMetadata(entry), 0, metasize); + } entry->next = d->ht_table[htidx][index]; d->ht_table[htidx][index] = entry; d->ht_used[htidx]++; @@ -906,7 +910,7 @@ unsigned long dictScan(dict *d, m0 = DICTHT_SIZE_MASK(d->ht_size_exp[htidx0]); /* Emit entries at cursor */ - if (bucketfn) bucketfn(privdata, &d->ht_table[htidx0][v & m0]); + if (bucketfn) bucketfn(d, &d->ht_table[htidx0][v & m0]); de = d->ht_table[htidx0][v & m0]; while (de) { next = de->next; @@ -937,7 +941,7 @@ unsigned long dictScan(dict *d, m1 = DICTHT_SIZE_MASK(d->ht_size_exp[htidx1]); /* Emit entries at cursor */ - if (bucketfn) bucketfn(privdata, &d->ht_table[htidx0][v & m0]); + if (bucketfn) bucketfn(d, &d->ht_table[htidx0][v & m0]); de = d->ht_table[htidx0][v & m0]; while (de) { next = de->next; @@ -949,7 +953,7 @@ unsigned long dictScan(dict *d, * of the index pointed to by the cursor in the smaller table */ do { /* Emit entries at cursor */ - if (bucketfn) bucketfn(privdata, &d->ht_table[htidx1][v & m1]); + if (bucketfn) bucketfn(d, &d->ht_table[htidx1][v & m1]); de = d->ht_table[htidx1][v & m1]; while (de) { next = de->next; diff --git a/src/dict.h b/src/dict.h index 495e48f4a..69d4afda6 100644 --- a/src/dict.h +++ b/src/dict.h @@ -52,7 +52,10 @@ typedef struct dictEntry { int64_t s64; double d; } v; - struct dictEntry *next; + struct dictEntry *next; /* Next entry in the same hash bucket. */ + void *metadata[]; /* An arbitrary number of bytes (starting at a + * pointer-aligned address) of size as returned + * by dictType's dictEntryMetadataBytes(). */ } dictEntry; typedef struct dict dict; @@ -65,6 +68,9 @@ typedef struct dictType { void (*keyDestructor)(dict *d, void *key); void (*valDestructor)(dict *d, void *obj); int (*expandAllowed)(size_t moreMem, double usedRatio); + /* Allow a dictEntry to carry extra caller-defined metadata. The + * extra memory is initialized to 0 when a dictEntry is allocated. */ + size_t (*dictEntryMetadataBytes)(dict *d); } dictType; #define DICTHT_SIZE(exp) ((exp) == -1 ? 0 : (unsigned long)1<<(exp)) @@ -97,7 +103,7 @@ typedef struct dictIterator { } dictIterator; typedef void (dictScanFunction)(void *privdata, const dictEntry *de); -typedef void (dictScanBucketFunction)(void *privdata, dictEntry **bucketref); +typedef void (dictScanBucketFunction)(dict *d, dictEntry **bucketref); /* This is the initial size of every hash table */ #define DICT_HT_INITIAL_EXP 2 @@ -140,6 +146,10 @@ typedef void (dictScanBucketFunction)(void *privdata, dictEntry **bucketref); (d)->type->keyCompare((d), key1, key2) : \ (key1) == (key2)) +#define dictMetadata(entry) (&(entry)->metadata) +#define dictMetadataSize(d) ((d)->type->dictEntryMetadataBytes \ + ? (d)->type->dictEntryMetadataBytes(d) : 0) + #define dictHashKey(d, key) (d)->type->hashFunction(key) #define dictGetKey(he) ((he)->key) #define dictGetVal(he) ((he)->v.val) diff --git a/src/lazyfree.c b/src/lazyfree.c index 34370cdb3..e793be4cf 100644 --- a/src/lazyfree.c +++ b/src/lazyfree.c @@ -29,16 +29,6 @@ void lazyfreeFreeDatabase(void *args[]) { atomicIncr(lazyfreed_objects,numkeys); } -/* Release the skiplist mapping Redis Cluster keys to slots in the - * lazyfree thread. */ -void lazyfreeFreeSlotsMap(void *args[]) { - rax *rt = args[0]; - size_t len = rt->numele; - raxFree(rt); - atomicDecr(lazyfree_objects,len); - atomicIncr(lazyfreed_objects,len); -} - /* Release the key tracking table. */ void lazyFreeTrackingTable(void *args[]) { rax *rt = args[0]; @@ -177,8 +167,8 @@ int dbAsyncDelete(redisDb *db, robj *key) { /* Release the key-val pair, or just the key if we set the val * field to NULL in order to lazy free it later. */ if (de) { + if (server.cluster_enabled) slotToKeyDelEntry(de); dictFreeUnlinkedEntry(db->dict,de); - if (server.cluster_enabled) slotToKeyDel(key->ptr); return 1; } else { return 0; @@ -207,18 +197,6 @@ void emptyDbAsync(redisDb *db) { bioCreateLazyFreeJob(lazyfreeFreeDatabase,2,oldht1,oldht2); } -/* Release the radix tree mapping Redis Cluster keys to slots. - * If the rax is huge enough, free it in async way. */ -void freeSlotsToKeysMapAsync(rax *rt) { - /* Because this rax has only keys and no values so we use numnodes. */ - if (rt->numnodes > LAZYFREE_THRESHOLD) { - atomicIncr(lazyfree_objects,rt->numele); - bioCreateLazyFreeJob(lazyfreeFreeSlotsMap,1,rt); - } else { - raxFree(rt); - } -} - /* Free the key tracking table. * If the table is huge enough, free it in async way. */ void freeTrackingRadixTreeAsync(rax *tracking) { diff --git a/src/server.c b/src/server.c index d0fa5d0f0..3a198c43f 100644 --- a/src/server.c +++ b/src/server.c @@ -1384,6 +1384,14 @@ int dictExpandAllowed(size_t moreMem, double usedRatio) { } } +/* Returns the size of the DB dict entry metadata in bytes. In cluster mode, the + * metadata is used for constructing a doubly linked list of the dict entries + * belonging to the same cluster slot. See the Slot to Key API in cluster.c. */ +size_t dictEntryMetadataSize(dict *d) { + UNUSED(d); + return server.cluster_enabled ? sizeof(clusterDictEntryMetadata) : 0; +} + /* Generic hash table type where keys are Redis Objects, Values * dummy pointers. */ dictType objectKeyPointerValueDictType = { @@ -1437,7 +1445,8 @@ dictType dbDictType = { dictSdsKeyCompare, /* key compare */ dictSdsDestructor, /* key destructor */ dictObjectDestructor, /* val destructor */ - dictExpandAllowed /* allow to expand */ + dictExpandAllowed, /* allow to expand */ + dictEntryMetadataSize /* size of entry metadata in bytes */ }; /* server.lua_scripts sha (as sds string) -> scripts (as robj) cache. */ diff --git a/src/server.h b/src/server.h index 1d7a84615..1ec441539 100644 --- a/src/server.h +++ b/src/server.h @@ -2426,22 +2426,14 @@ void discardDbBackup(dbBackup *backup, int flags, void(callback)(dict*)); int selectDb(client *c, int id); void signalModifiedKey(client *c, redisDb *db, robj *key); void signalFlushedDb(int dbid, int async); -unsigned int getKeysInSlot(unsigned int hashslot, robj **keys, unsigned int count); -unsigned int countKeysInSlot(unsigned int hashslot); -unsigned int delKeysInSlot(unsigned int hashslot); void scanGenericCommand(client *c, robj *o, unsigned long cursor); int parseScanCursorOrReply(client *c, robj *o, unsigned long *cursor); -void slotToKeyAdd(sds key); -void slotToKeyDel(sds key); int dbAsyncDelete(redisDb *db, robj *key); void emptyDbAsync(redisDb *db); -void slotToKeyFlush(int async); size_t lazyfreeGetPendingObjectsCount(void); size_t lazyfreeGetFreedObjectsCount(void); void lazyfreeResetStats(void); void freeObjAsync(robj *key, robj *obj, int dbid); -void freeSlotsToKeysMapAsync(rax *rt); -void freeSlotsToKeysMap(rax *rt, int async); /* API to get key arguments from commands */ |