summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorViktor Söderqvist <viktor.soderqvist@est.tech>2021-08-31 08:25:36 +0200
committerGitHub <noreply@github.com>2021-08-30 23:25:36 -0700
commitf24c63a292e045d4b14b82b25981f00a95c1767a (patch)
tree8d378dd0e6eb9fc016840e3652236b34fa9a9b3f
parent1e7ad894d2ecb9ad73d68ff1f1102195582b1469 (diff)
downloadredis-f24c63a292e045d4b14b82b25981f00a95c1767a.tar.gz
Slot-to-keys using dict entry metadata (#9356)
* Enhance dict to support arbitrary metadata carried in dictEntry Co-authored-by: Viktor Söderqvist <viktor.soderqvist@est.tech> * Rewrite slot-to-keys mapping to linked lists using dict entry metadata This is a memory enhancement for Redis Cluster. The radix tree slots_to_keys (which duplicates all key names prefixed with their slot number) is replaced with a linked list for each slot. The dict entries of the same cluster slot form a linked list and the pointers are stored as metadata in each dict entry of the main DB dict. This commit also moves the slot-to-key API from db.c to cluster.c. Co-authored-by: Jim Brunner <brunnerj@amazon.com>
-rw-r--r--src/cluster.c131
-rw-r--r--src/cluster.h26
-rw-r--r--src/db.c140
-rw-r--r--src/defrag.c10
-rw-r--r--src/dict.c12
-rw-r--r--src/dict.h14
-rw-r--r--src/lazyfree.c24
-rw-r--r--src/server.c11
-rw-r--r--src/server.h8
9 files changed, 191 insertions, 185 deletions
diff --git a/src/cluster.c b/src/cluster.c
index c7e1e66c5..9f5e9e0b3 100644
--- a/src/cluster.c
+++ b/src/cluster.c
@@ -77,6 +77,15 @@ uint64_t clusterGetMaxEpoch(void);
int clusterBumpConfigEpochWithoutConsensus(void);
void moduleCallClusterReceivers(const char *sender_id, uint64_t module_id, uint8_t type, const unsigned char *payload, uint32_t len);
const char *clusterGetMessageTypeString(int type);
+unsigned int countKeysInSlot(unsigned int hashslot);
+unsigned int delKeysInSlot(unsigned int hashslot);
+
+/* Links to the next and previous entries for keys in the same slot are stored
+ * in the dict entry metadata. See Slot to Key API below. */
+#define dictEntryNextInSlot(de) \
+ (((clusterDictEntryMetadata *)dictMetadata(de))->next)
+#define dictEntryPrevInSlot(de) \
+ (((clusterDictEntryMetadata *)dictMetadata(de))->prev)
#define RCVBUF_INIT_LEN 1024
#define RCVBUF_MAX_PREALLOC (1<<20) /* 1MB */
@@ -572,10 +581,8 @@ void clusterInit(void) {
serverPanic("Unrecoverable error creating Redis Cluster socket accept handler.");
}
- /* The slots -> keys map is a radix tree. Initialize it here. */
- server.cluster->slots_to_keys = raxNew();
- memset(server.cluster->slots_keys_count,0,
- sizeof(server.cluster->slots_keys_count));
+ /* Reset data for the Slot to key API. */
+ slotToKeyFlush();
/* Set myself->port/cport/pport to my listening ports, we'll just need to
* discover the IP address via MEET messages. */
@@ -4844,8 +4851,6 @@ NULL
} else if (!strcasecmp(c->argv[1]->ptr,"getkeysinslot") && c->argc == 4) {
/* CLUSTER GETKEYSINSLOT <slot> <count> */
long long maxkeys, slot;
- unsigned int numkeys, j;
- robj **keys;
if (getLongLongFromObjectOrReply(c,c->argv[2],&slot,NULL) != C_OK)
return;
@@ -4857,19 +4862,16 @@ NULL
return;
}
- /* Avoid allocating more than needed in case of large COUNT argument
- * and smaller actual number of keys. */
unsigned int keys_in_slot = countKeysInSlot(slot);
- if (maxkeys > keys_in_slot) maxkeys = keys_in_slot;
-
- keys = zmalloc(sizeof(robj*)*maxkeys);
- numkeys = getKeysInSlot(slot, keys, maxkeys);
+ unsigned int numkeys = maxkeys > keys_in_slot ? keys_in_slot : maxkeys;
addReplyArrayLen(c,numkeys);
- for (j = 0; j < numkeys; j++) {
- addReplyBulk(c,keys[j]);
- decrRefCount(keys[j]);
+ dictEntry *de = server.cluster->slots_to_keys[slot].head;
+ for (unsigned int j = 0; j < numkeys; j++) {
+ serverAssert(de != NULL);
+ sds sdskey = dictGetKey(de);
+ addReplyBulkCBuffer(c, sdskey, sdslen(sdskey));
+ de = dictEntryNextInSlot(de);
}
- zfree(keys);
} else if (!strcasecmp(c->argv[1]->ptr,"forget") && c->argc == 3) {
/* CLUSTER FORGET <NODE ID> */
clusterNode *n = clusterLookupNode(c->argv[2]->ptr);
@@ -6099,3 +6101,100 @@ int clusterRedirectBlockedClientIfNeeded(client *c) {
}
return 0;
}
+
+/* Slot to Key API. This is used by Redis Cluster in order to obtain in
+ * a fast way a key that belongs to a specified hash slot. This is useful
+ * while rehashing the cluster and in other conditions when we need to
+ * understand if we have keys for a given hash slot. */
+
+void slotToKeyAddEntry(dictEntry *entry) {
+ sds key = entry->key;
+ unsigned int hashslot = keyHashSlot(key, sdslen(key));
+ server.cluster->slots_to_keys[hashslot].count++;
+
+ /* Insert entry before the first element in the list. */
+ dictEntry *first = server.cluster->slots_to_keys[hashslot].head;
+ dictEntryNextInSlot(entry) = first;
+ if (first != NULL) {
+ serverAssert(dictEntryPrevInSlot(first) == NULL);
+ dictEntryPrevInSlot(first) = entry;
+ }
+ serverAssert(dictEntryPrevInSlot(entry) == NULL);
+ server.cluster->slots_to_keys[hashslot].head = entry;
+}
+
+void slotToKeyDelEntry(dictEntry *entry) {
+ sds key = entry->key;
+ unsigned int hashslot = keyHashSlot(key, sdslen(key));
+ server.cluster->slots_to_keys[hashslot].count--;
+
+ /* Connect previous and next entries to each other. */
+ dictEntry *next = dictEntryNextInSlot(entry);
+ dictEntry *prev = dictEntryPrevInSlot(entry);
+ if (next != NULL) {
+ dictEntryPrevInSlot(next) = prev;
+ }
+ if (prev != NULL) {
+ dictEntryNextInSlot(prev) = next;
+ } else {
+ /* The removed entry was the first in the list. */
+ serverAssert(server.cluster->slots_to_keys[hashslot].head == entry);
+ server.cluster->slots_to_keys[hashslot].head = next;
+ }
+}
+
+/* Updates neighbour entries when an entry has been replaced (e.g. reallocated
+ * during active defrag). */
+void slotToKeyReplaceEntry(dictEntry *entry) {
+ dictEntry *next = dictEntryNextInSlot(entry);
+ dictEntry *prev = dictEntryPrevInSlot(entry);
+ if (next != NULL) {
+ dictEntryPrevInSlot(next) = entry;
+ }
+ if (prev != NULL) {
+ dictEntryNextInSlot(prev) = entry;
+ } else {
+ /* The replaced entry was the first in the list. */
+ sds key = entry->key;
+ unsigned int hashslot = keyHashSlot(key, sdslen(key));
+ server.cluster->slots_to_keys[hashslot].head = entry;
+ }
+}
+
+/* Copies the slots-keys map to the specified backup structure. */
+void slotToKeyCopyToBackup(clusterSlotsToKeysData *backup) {
+ memcpy(backup, server.cluster->slots_to_keys,
+ sizeof(server.cluster->slots_to_keys));
+}
+
+/* Overwrites the slots-keys map by copying the provided backup structure. */
+void slotToKeyRestoreBackup(clusterSlotsToKeysData *backup) {
+ memcpy(server.cluster->slots_to_keys, backup,
+ sizeof(server.cluster->slots_to_keys));
+}
+
+/* Empty the slots-keys map of Redis Cluster. */
+void slotToKeyFlush(void) {
+ memset(&server.cluster->slots_to_keys, 0,
+ sizeof(server.cluster->slots_to_keys));
+}
+
+/* Remove all the keys in the specified hash slot.
+ * The number of removed items is returned. */
+unsigned int delKeysInSlot(unsigned int hashslot) {
+ unsigned int j = 0;
+ dictEntry *de = server.cluster->slots_to_keys[hashslot].head;
+ while (de != NULL) {
+ sds sdskey = dictGetKey(de);
+ de = dictEntryNextInSlot(de);
+ robj *key = createStringObject(sdskey, sdslen(sdskey));
+ dbDelete(&server.db[0], key);
+ decrRefCount(key);
+ j++;
+ }
+ return j;
+}
+
+unsigned int countKeysInSlot(unsigned int hashslot) {
+ return server.cluster->slots_to_keys[hashslot].count;
+}
diff --git a/src/cluster.h b/src/cluster.h
index 890cd788a..54a1ac25a 100644
--- a/src/cluster.h
+++ b/src/cluster.h
@@ -141,6 +141,23 @@ typedef struct clusterNode {
list *fail_reports; /* List of nodes signaling this as failing */
} clusterNode;
+/* State for the Slot to Key API, for a single slot. The keys in the same slot
+ * are linked together using dictEntry metadata. See also "Slot to Key API" in
+ * cluster.c. */
+struct clusterSlotToKeys {
+ uint64_t count; /* Number of keys in the slot. */
+ dictEntry *head; /* The first key-value entry in the slot. */
+};
+typedef struct clusterSlotToKeys clusterSlotsToKeysData[CLUSTER_SLOTS];
+
+/* Dict entry metadata for cluster mode, used for the Slot to Key API to form a
+ * linked list of the entries belonging to the same slot. */
+typedef struct clusterDictEntryMetadata {
+ dictEntry *prev; /* Prev entry with key in the same slot */
+ dictEntry *next; /* Next entry with key in the same slot */
+} clusterDictEntryMetadata;
+
+
typedef struct clusterState {
clusterNode *myself; /* This node */
uint64_t currentEpoch;
@@ -151,8 +168,7 @@ typedef struct clusterState {
clusterNode *migrating_slots_to[CLUSTER_SLOTS];
clusterNode *importing_slots_from[CLUSTER_SLOTS];
clusterNode *slots[CLUSTER_SLOTS];
- uint64_t slots_keys_count[CLUSTER_SLOTS];
- rax *slots_to_keys;
+ clusterSlotsToKeysData slots_to_keys;
/* The following fields are used to take the slave state on elections. */
mstime_t failover_auth_time; /* Time of previous or next election. */
int failover_auth_count; /* Number of votes received so far. */
@@ -299,5 +315,11 @@ unsigned long getClusterConnectionsCount(void);
int clusterSendModuleMessageToTarget(const char *target, uint64_t module_id, uint8_t type, unsigned char *payload, uint32_t len);
void clusterPropagatePublish(robj *channel, robj *message);
unsigned int keyHashSlot(char *key, int keylen);
+void slotToKeyAddEntry(dictEntry *entry);
+void slotToKeyDelEntry(dictEntry *entry);
+void slotToKeyReplaceEntry(dictEntry *entry);
+void slotToKeyCopyToBackup(clusterSlotsToKeysData *backup);
+void slotToKeyRestoreBackup(clusterSlotsToKeysData *backup);
+void slotToKeyFlush(void);
#endif /* __CLUSTER_H */
diff --git a/src/db.c b/src/db.c
index 94a03e8e0..efee7feea 100644
--- a/src/db.c
+++ b/src/db.c
@@ -38,8 +38,7 @@
/* Database backup. */
struct dbBackup {
redisDb *dbarray;
- rax *slots_to_keys;
- uint64_t slots_keys_count[CLUSTER_SLOTS];
+ clusterSlotsToKeysData slots_to_keys;
};
/*-----------------------------------------------------------------------------
@@ -184,11 +183,11 @@ robj *lookupKeyWriteOrReply(client *c, robj *key, robj *reply) {
* The program is aborted if the key already exists. */
void dbAdd(redisDb *db, robj *key, robj *val) {
sds copy = sdsdup(key->ptr);
- int retval = dictAdd(db->dict, copy, val);
-
- serverAssertWithInfo(NULL,key,retval == DICT_OK);
+ dictEntry *de = dictAddRaw(db->dict, copy, NULL);
+ serverAssertWithInfo(NULL, key, de != NULL);
+ dictSetVal(db->dict, de, val);
signalKeyAsReady(db, key, val->type);
- if (server.cluster_enabled) slotToKeyAdd(key->ptr);
+ if (server.cluster_enabled) slotToKeyAddEntry(de);
}
/* This is a special version of dbAdd() that is used only when loading
@@ -203,9 +202,10 @@ void dbAdd(redisDb *db, robj *key, robj *val) {
* ownership of the SDS string, otherwise 0 is returned, and is up to the
* caller to free the SDS string. */
int dbAddRDBLoad(redisDb *db, sds key, robj *val) {
- int retval = dictAdd(db->dict, key, val);
- if (retval != DICT_OK) return 0;
- if (server.cluster_enabled) slotToKeyAdd(key);
+ dictEntry *de = dictAddRaw(db->dict, key, NULL);
+ if (de == NULL) return 0;
+ dictSetVal(db->dict, de, val);
+ if (server.cluster_enabled) slotToKeyAddEntry(de);
return 1;
}
@@ -313,8 +313,8 @@ int dbSyncDelete(redisDb *db, robj *key) {
robj *val = dictGetVal(de);
/* Tells the module that the key has been unlinked from the database. */
moduleNotifyKeyUnlink(key,val,db->id);
+ if (server.cluster_enabled) slotToKeyDelEntry(de);
dictFreeUnlinkedEntry(db->dict,de);
- if (server.cluster_enabled) slotToKeyDel(key->ptr);
return 1;
} else {
return 0;
@@ -441,7 +441,7 @@ long long emptyDb(int dbnum, int flags, void(callback)(dict*)) {
/* Flush slots to keys map if enable cluster, we can flush entire
* slots to keys map whatever dbnum because only support one DB
* in cluster mode. */
- if (server.cluster_enabled) slotToKeyFlush(async);
+ if (server.cluster_enabled) slotToKeyFlush();
if (dbnum == -1) flushSlaveKeysWithExpireList();
@@ -469,12 +469,8 @@ dbBackup *backupDb(void) {
/* Backup cluster slots to keys map if enable cluster. */
if (server.cluster_enabled) {
- backup->slots_to_keys = server.cluster->slots_to_keys;
- memcpy(backup->slots_keys_count, server.cluster->slots_keys_count,
- sizeof(server.cluster->slots_keys_count));
- server.cluster->slots_to_keys = raxNew();
- memset(server.cluster->slots_keys_count, 0,
- sizeof(server.cluster->slots_keys_count));
+ slotToKeyCopyToBackup(&backup->slots_to_keys);
+ slotToKeyFlush();
}
moduleFireServerEvent(REDISMODULE_EVENT_REPL_BACKUP,
@@ -496,9 +492,6 @@ void discardDbBackup(dbBackup *backup, int flags, void(callback)(dict*)) {
dictRelease(backup->dbarray[i].expires);
}
- /* Release slots to keys map backup if enable cluster. */
- if (server.cluster_enabled) freeSlotsToKeysMap(backup->slots_to_keys, async);
-
/* Release backup. */
zfree(backup->dbarray);
zfree(backup);
@@ -523,13 +516,7 @@ void restoreDbBackup(dbBackup *backup) {
}
/* Restore slots to keys map backup if enable cluster. */
- if (server.cluster_enabled) {
- serverAssert(server.cluster->slots_to_keys->numele == 0);
- raxFree(server.cluster->slots_to_keys);
- server.cluster->slots_to_keys = backup->slots_to_keys;
- memcpy(server.cluster->slots_keys_count, backup->slots_keys_count,
- sizeof(server.cluster->slots_keys_count));
- }
+ if (server.cluster_enabled) slotToKeyRestoreBackup(&backup->slots_to_keys);
/* Release backup. */
zfree(backup->dbarray);
@@ -1913,102 +1900,3 @@ int xreadGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult
result->numkeys = num;
return num;
}
-
-/* Slot to Key API. This is used by Redis Cluster in order to obtain in
- * a fast way a key that belongs to a specified hash slot. This is useful
- * while rehashing the cluster and in other conditions when we need to
- * understand if we have keys for a given hash slot. */
-void slotToKeyUpdateKey(sds key, int add) {
- size_t keylen = sdslen(key);
- unsigned int hashslot = keyHashSlot(key,keylen);
- unsigned char buf[64];
- unsigned char *indexed = buf;
-
- server.cluster->slots_keys_count[hashslot] += add ? 1 : -1;
- if (keylen+2 > 64) indexed = zmalloc(keylen+2);
- indexed[0] = (hashslot >> 8) & 0xff;
- indexed[1] = hashslot & 0xff;
- memcpy(indexed+2,key,keylen);
- if (add) {
- raxInsert(server.cluster->slots_to_keys,indexed,keylen+2,NULL,NULL);
- } else {
- raxRemove(server.cluster->slots_to_keys,indexed,keylen+2,NULL);
- }
- if (indexed != buf) zfree(indexed);
-}
-
-void slotToKeyAdd(sds key) {
- slotToKeyUpdateKey(key,1);
-}
-
-void slotToKeyDel(sds key) {
- slotToKeyUpdateKey(key,0);
-}
-
-/* Release the radix tree mapping Redis Cluster keys to slots. If 'async'
- * is true, we release it asynchronously. */
-void freeSlotsToKeysMap(rax *rt, int async) {
- if (async) {
- freeSlotsToKeysMapAsync(rt);
- } else {
- raxFree(rt);
- }
-}
-
-/* Empty the slots-keys map of Redis CLuster by creating a new empty one and
- * freeing the old one. */
-void slotToKeyFlush(int async) {
- rax *old = server.cluster->slots_to_keys;
-
- server.cluster->slots_to_keys = raxNew();
- memset(server.cluster->slots_keys_count,0,
- sizeof(server.cluster->slots_keys_count));
- freeSlotsToKeysMap(old, async);
-}
-
-/* Populate the specified array of objects with keys in the specified slot.
- * New objects are returned to represent keys, it's up to the caller to
- * decrement the reference count to release the keys names. */
-unsigned int getKeysInSlot(unsigned int hashslot, robj **keys, unsigned int count) {
- raxIterator iter;
- int j = 0;
- unsigned char indexed[2];
-
- indexed[0] = (hashslot >> 8) & 0xff;
- indexed[1] = hashslot & 0xff;
- raxStart(&iter,server.cluster->slots_to_keys);
- raxSeek(&iter,">=",indexed,2);
- while(count-- && raxNext(&iter)) {
- if (iter.key[0] != indexed[0] || iter.key[1] != indexed[1]) break;
- keys[j++] = createStringObject((char*)iter.key+2,iter.key_len-2);
- }
- raxStop(&iter);
- return j;
-}
-
-/* Remove all the keys in the specified hash slot.
- * The number of removed items is returned. */
-unsigned int delKeysInSlot(unsigned int hashslot) {
- raxIterator iter;
- int j = 0;
- unsigned char indexed[2];
-
- indexed[0] = (hashslot >> 8) & 0xff;
- indexed[1] = hashslot & 0xff;
- raxStart(&iter,server.cluster->slots_to_keys);
- while(server.cluster->slots_keys_count[hashslot]) {
- raxSeek(&iter,">=",indexed,2);
- raxNext(&iter);
-
- robj *key = createStringObject((char*)iter.key+2,iter.key_len-2);
- dbDelete(&server.db[0],key);
- decrRefCount(key);
- j++;
- }
- raxStop(&iter);
- return j;
-}
-
-unsigned int countKeysInSlot(unsigned int hashslot) {
- return server.cluster->slots_keys_count[hashslot];
-}
diff --git a/src/defrag.c b/src/defrag.c
index c804af370..f5d56078d 100644
--- a/src/defrag.c
+++ b/src/defrag.c
@@ -34,6 +34,7 @@
*/
#include "server.h"
+#include "cluster.h"
#include <time.h>
#include <assert.h>
#include <stddef.h>
@@ -45,7 +46,7 @@
int je_get_defrag_hint(void* ptr);
/* forward declarations*/
-void defragDictBucketCallback(void *privdata, dictEntry **bucketref);
+void defragDictBucketCallback(dict *d, dictEntry **bucketref);
dictEntry* replaceSatelliteDictKeyPtrAndOrDefragDictEntry(dict *d, sds oldkey, sds newkey, uint64_t hash, long *defragged);
/* Defrag helper for generic allocations.
@@ -895,12 +896,15 @@ void defragScanCallback(void *privdata, const dictEntry *de) {
/* Defrag scan callback for each hash table bucket,
* used in order to defrag the dictEntry allocations. */
-void defragDictBucketCallback(void *privdata, dictEntry **bucketref) {
- UNUSED(privdata); /* NOTE: this function is also used by both activeDefragCycle and scanLaterHash, etc. don't use privdata */
+void defragDictBucketCallback(dict *d, dictEntry **bucketref) {
while(*bucketref) {
dictEntry *de = *bucketref, *newde;
if ((newde = activeDefragAlloc(de))) {
*bucketref = newde;
+ if (server.cluster_enabled && d == server.db[0].dict) {
+ /* Cluster keyspace dict. Update slot-to-entries mapping. */
+ slotToKeyReplaceEntry(newde);
+ }
}
bucketref = &(*bucketref)->next;
}
diff --git a/src/dict.c b/src/dict.c
index a26ba9eba..9b08baaf1 100644
--- a/src/dict.c
+++ b/src/dict.c
@@ -338,7 +338,11 @@ dictEntry *dictAddRaw(dict *d, void *key, dictEntry **existing)
* system it is more likely that recently added entries are accessed
* more frequently. */
htidx = dictIsRehashing(d) ? 1 : 0;
- entry = zmalloc(sizeof(*entry));
+ size_t metasize = dictMetadataSize(d);
+ entry = zmalloc(sizeof(*entry) + metasize);
+ if (metasize > 0) {
+ memset(dictMetadata(entry), 0, metasize);
+ }
entry->next = d->ht_table[htidx][index];
d->ht_table[htidx][index] = entry;
d->ht_used[htidx]++;
@@ -906,7 +910,7 @@ unsigned long dictScan(dict *d,
m0 = DICTHT_SIZE_MASK(d->ht_size_exp[htidx0]);
/* Emit entries at cursor */
- if (bucketfn) bucketfn(privdata, &d->ht_table[htidx0][v & m0]);
+ if (bucketfn) bucketfn(d, &d->ht_table[htidx0][v & m0]);
de = d->ht_table[htidx0][v & m0];
while (de) {
next = de->next;
@@ -937,7 +941,7 @@ unsigned long dictScan(dict *d,
m1 = DICTHT_SIZE_MASK(d->ht_size_exp[htidx1]);
/* Emit entries at cursor */
- if (bucketfn) bucketfn(privdata, &d->ht_table[htidx0][v & m0]);
+ if (bucketfn) bucketfn(d, &d->ht_table[htidx0][v & m0]);
de = d->ht_table[htidx0][v & m0];
while (de) {
next = de->next;
@@ -949,7 +953,7 @@ unsigned long dictScan(dict *d,
* of the index pointed to by the cursor in the smaller table */
do {
/* Emit entries at cursor */
- if (bucketfn) bucketfn(privdata, &d->ht_table[htidx1][v & m1]);
+ if (bucketfn) bucketfn(d, &d->ht_table[htidx1][v & m1]);
de = d->ht_table[htidx1][v & m1];
while (de) {
next = de->next;
diff --git a/src/dict.h b/src/dict.h
index 495e48f4a..69d4afda6 100644
--- a/src/dict.h
+++ b/src/dict.h
@@ -52,7 +52,10 @@ typedef struct dictEntry {
int64_t s64;
double d;
} v;
- struct dictEntry *next;
+ struct dictEntry *next; /* Next entry in the same hash bucket. */
+ void *metadata[]; /* An arbitrary number of bytes (starting at a
+ * pointer-aligned address) of size as returned
+ * by dictType's dictEntryMetadataBytes(). */
} dictEntry;
typedef struct dict dict;
@@ -65,6 +68,9 @@ typedef struct dictType {
void (*keyDestructor)(dict *d, void *key);
void (*valDestructor)(dict *d, void *obj);
int (*expandAllowed)(size_t moreMem, double usedRatio);
+ /* Allow a dictEntry to carry extra caller-defined metadata. The
+ * extra memory is initialized to 0 when a dictEntry is allocated. */
+ size_t (*dictEntryMetadataBytes)(dict *d);
} dictType;
#define DICTHT_SIZE(exp) ((exp) == -1 ? 0 : (unsigned long)1<<(exp))
@@ -97,7 +103,7 @@ typedef struct dictIterator {
} dictIterator;
typedef void (dictScanFunction)(void *privdata, const dictEntry *de);
-typedef void (dictScanBucketFunction)(void *privdata, dictEntry **bucketref);
+typedef void (dictScanBucketFunction)(dict *d, dictEntry **bucketref);
/* This is the initial size of every hash table */
#define DICT_HT_INITIAL_EXP 2
@@ -140,6 +146,10 @@ typedef void (dictScanBucketFunction)(void *privdata, dictEntry **bucketref);
(d)->type->keyCompare((d), key1, key2) : \
(key1) == (key2))
+#define dictMetadata(entry) (&(entry)->metadata)
+#define dictMetadataSize(d) ((d)->type->dictEntryMetadataBytes \
+ ? (d)->type->dictEntryMetadataBytes(d) : 0)
+
#define dictHashKey(d, key) (d)->type->hashFunction(key)
#define dictGetKey(he) ((he)->key)
#define dictGetVal(he) ((he)->v.val)
diff --git a/src/lazyfree.c b/src/lazyfree.c
index 34370cdb3..e793be4cf 100644
--- a/src/lazyfree.c
+++ b/src/lazyfree.c
@@ -29,16 +29,6 @@ void lazyfreeFreeDatabase(void *args[]) {
atomicIncr(lazyfreed_objects,numkeys);
}
-/* Release the skiplist mapping Redis Cluster keys to slots in the
- * lazyfree thread. */
-void lazyfreeFreeSlotsMap(void *args[]) {
- rax *rt = args[0];
- size_t len = rt->numele;
- raxFree(rt);
- atomicDecr(lazyfree_objects,len);
- atomicIncr(lazyfreed_objects,len);
-}
-
/* Release the key tracking table. */
void lazyFreeTrackingTable(void *args[]) {
rax *rt = args[0];
@@ -177,8 +167,8 @@ int dbAsyncDelete(redisDb *db, robj *key) {
/* Release the key-val pair, or just the key if we set the val
* field to NULL in order to lazy free it later. */
if (de) {
+ if (server.cluster_enabled) slotToKeyDelEntry(de);
dictFreeUnlinkedEntry(db->dict,de);
- if (server.cluster_enabled) slotToKeyDel(key->ptr);
return 1;
} else {
return 0;
@@ -207,18 +197,6 @@ void emptyDbAsync(redisDb *db) {
bioCreateLazyFreeJob(lazyfreeFreeDatabase,2,oldht1,oldht2);
}
-/* Release the radix tree mapping Redis Cluster keys to slots.
- * If the rax is huge enough, free it in async way. */
-void freeSlotsToKeysMapAsync(rax *rt) {
- /* Because this rax has only keys and no values so we use numnodes. */
- if (rt->numnodes > LAZYFREE_THRESHOLD) {
- atomicIncr(lazyfree_objects,rt->numele);
- bioCreateLazyFreeJob(lazyfreeFreeSlotsMap,1,rt);
- } else {
- raxFree(rt);
- }
-}
-
/* Free the key tracking table.
* If the table is huge enough, free it in async way. */
void freeTrackingRadixTreeAsync(rax *tracking) {
diff --git a/src/server.c b/src/server.c
index d0fa5d0f0..3a198c43f 100644
--- a/src/server.c
+++ b/src/server.c
@@ -1384,6 +1384,14 @@ int dictExpandAllowed(size_t moreMem, double usedRatio) {
}
}
+/* Returns the size of the DB dict entry metadata in bytes. In cluster mode, the
+ * metadata is used for constructing a doubly linked list of the dict entries
+ * belonging to the same cluster slot. See the Slot to Key API in cluster.c. */
+size_t dictEntryMetadataSize(dict *d) {
+ UNUSED(d);
+ return server.cluster_enabled ? sizeof(clusterDictEntryMetadata) : 0;
+}
+
/* Generic hash table type where keys are Redis Objects, Values
* dummy pointers. */
dictType objectKeyPointerValueDictType = {
@@ -1437,7 +1445,8 @@ dictType dbDictType = {
dictSdsKeyCompare, /* key compare */
dictSdsDestructor, /* key destructor */
dictObjectDestructor, /* val destructor */
- dictExpandAllowed /* allow to expand */
+ dictExpandAllowed, /* allow to expand */
+ dictEntryMetadataSize /* size of entry metadata in bytes */
};
/* server.lua_scripts sha (as sds string) -> scripts (as robj) cache. */
diff --git a/src/server.h b/src/server.h
index 1d7a84615..1ec441539 100644
--- a/src/server.h
+++ b/src/server.h
@@ -2426,22 +2426,14 @@ void discardDbBackup(dbBackup *backup, int flags, void(callback)(dict*));
int selectDb(client *c, int id);
void signalModifiedKey(client *c, redisDb *db, robj *key);
void signalFlushedDb(int dbid, int async);
-unsigned int getKeysInSlot(unsigned int hashslot, robj **keys, unsigned int count);
-unsigned int countKeysInSlot(unsigned int hashslot);
-unsigned int delKeysInSlot(unsigned int hashslot);
void scanGenericCommand(client *c, robj *o, unsigned long cursor);
int parseScanCursorOrReply(client *c, robj *o, unsigned long *cursor);
-void slotToKeyAdd(sds key);
-void slotToKeyDel(sds key);
int dbAsyncDelete(redisDb *db, robj *key);
void emptyDbAsync(redisDb *db);
-void slotToKeyFlush(int async);
size_t lazyfreeGetPendingObjectsCount(void);
size_t lazyfreeGetFreedObjectsCount(void);
void lazyfreeResetStats(void);
void freeObjAsync(robj *key, robj *obj, int dbid);
-void freeSlotsToKeysMapAsync(rax *rt);
-void freeSlotsToKeysMap(rax *rt, int async);
/* API to get key arguments from commands */