diff options
-rw-r--r-- | src/cluster.c | 10 | ||||
-rw-r--r-- | src/cluster.h | 5 | ||||
-rw-r--r-- | src/db.c | 2 | ||||
-rw-r--r-- | src/defrag.c | 105 | ||||
-rw-r--r-- | src/dict.c | 89 | ||||
-rw-r--r-- | src/dict.h | 30 | ||||
-rw-r--r-- | src/expire.c | 2 | ||||
-rw-r--r-- | src/module.c | 4 | ||||
-rw-r--r-- | src/object.c | 3 | ||||
-rw-r--r-- | src/server.c | 17 |
10 files changed, 158 insertions, 109 deletions
diff --git a/src/cluster.c b/src/cluster.c index ecebc511b..82f3e36a8 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -93,9 +93,9 @@ static void clusterBuildMessageHdr(clusterMsg *hdr, int type, size_t msglen); /* Links to the next and previous entries for keys in the same slot are stored * in the dict entry metadata. See Slot to Key API below. */ #define dictEntryNextInSlot(de) \ - (((clusterDictEntryMetadata *)dictMetadata(de))->next) + (((clusterDictEntryMetadata *)dictEntryMetadata(de))->next) #define dictEntryPrevInSlot(de) \ - (((clusterDictEntryMetadata *)dictMetadata(de))->prev) + (((clusterDictEntryMetadata *)dictEntryMetadata(de))->prev) #define RCVBUF_INIT_LEN 1024 #define RCVBUF_MAX_PREALLOC (1<<20) /* 1MB */ @@ -7327,7 +7327,7 @@ void slotToKeyDelEntry(dictEntry *entry, redisDb *db) { /* Updates neighbour entries when an entry has been replaced (e.g. reallocated * during active defrag). */ -void slotToKeyReplaceEntry(dictEntry *entry, redisDb *db) { +void slotToKeyReplaceEntry(dict *d, dictEntry *entry) { dictEntry *next = dictEntryNextInSlot(entry); dictEntry *prev = dictEntryPrevInSlot(entry); if (next != NULL) { @@ -7339,6 +7339,8 @@ void slotToKeyReplaceEntry(dictEntry *entry, redisDb *db) { /* The replaced entry was the first in the list. */ sds key = dictGetKey(entry); unsigned int hashslot = keyHashSlot(key, sdslen(key)); + clusterDictMetadata *dictmeta = dictMetadata(d); + redisDb *db = dictmeta->db; slotToKeys *slot_to_keys = &(*db->slots_to_keys).by_slot[hashslot]; slot_to_keys->head = entry; } @@ -7347,6 +7349,8 @@ void slotToKeyReplaceEntry(dictEntry *entry, redisDb *db) { /* Initialize slots-keys map of given db. */ void slotToKeyInit(redisDb *db) { db->slots_to_keys = zcalloc(sizeof(clusterSlotToKeyMapping)); + clusterDictMetadata *dictmeta = dictMetadata(db->dict); + dictmeta->db = db; } /* Empty slots-keys map of given db. */ diff --git a/src/cluster.h b/src/cluster.h index 4c93dbc8d..b76a9a7cd 100644 --- a/src/cluster.h +++ b/src/cluster.h @@ -169,6 +169,9 @@ typedef struct clusterDictEntryMetadata { dictEntry *next; /* Next entry with key in the same slot */ } clusterDictEntryMetadata; +typedef struct { + redisDb *db; /* A link back to the db this dict belongs to */ +} clusterDictMetadata; typedef struct clusterState { clusterNode *myself; /* This node */ @@ -409,7 +412,7 @@ void clusterPropagatePublish(robj *channel, robj *message, int sharded); unsigned int keyHashSlot(char *key, int keylen); void slotToKeyAddEntry(dictEntry *entry, redisDb *db); void slotToKeyDelEntry(dictEntry *entry, redisDb *db); -void slotToKeyReplaceEntry(dictEntry *entry, redisDb *db); +void slotToKeyReplaceEntry(dict *d, dictEntry *entry); void slotToKeyInit(redisDb *db); void slotToKeyFlush(redisDb *db); void slotToKeyDestroy(redisDb *db); @@ -939,7 +939,7 @@ void scanGenericCommand(client *c, robj *o, unsigned long cursor) { privdata[0] = keys; privdata[1] = o; do { - cursor = dictScan(ht, cursor, scanCallback, NULL, privdata); + cursor = dictScan(ht, cursor, scanCallback, privdata); } while (cursor && maxiterations-- && listLength(keys) < (unsigned long)count); diff --git a/src/defrag.c b/src/defrag.c index 8839e1107..0d6c6b45a 100644 --- a/src/defrag.c +++ b/src/defrag.c @@ -45,10 +45,6 @@ * pointers are worthwhile moving and which aren't */ int je_get_defrag_hint(void* ptr); -/* forward declarations*/ -void defragDictBucketCallback(dict *d, dictEntry **bucketref); -dictEntry* replaceSatelliteDictKeyPtrAndOrDefragDictEntry(dict *d, sds oldkey, sds newkey, uint64_t hash, long *defragged); - /* Defrag helper for generic allocations. * * returns NULL in case the allocation wasn't moved. @@ -289,8 +285,8 @@ long activeDefragSdsDict(dict* d, int val_type) { activeDefragSdsDictData data = {d, val_type, 0}; unsigned long cursor = 0; do { - cursor = dictScan(d, cursor, activeDefragSdsDictCallback, - defragDictBucketCallback, &data); + cursor = dictScanDefrag(d, cursor, activeDefragSdsDictCallback, + activeDefragAlloc, &data); } while (cursor != 0); return data.defragged; } @@ -329,29 +325,6 @@ long activeDefragList(list *l, int val_type) { return defragged; } -/* Utility function that replaces an old key pointer in the dictionary with a - * new pointer. Additionally, we try to defrag the dictEntry in that dict. - * Oldkey may be a dead pointer and should not be accessed (we get a - * pre-calculated hash value). Newkey may be null if the key pointer wasn't - * moved. Return value is the dictEntry if found, or NULL if not found. - * NOTE: this is very ugly code, but it let's us avoid the complication of - * doing a scan on another dict. */ -dictEntry* replaceSatelliteDictKeyPtrAndOrDefragDictEntry(dict *d, sds oldkey, sds newkey, uint64_t hash, long *defragged) { - dictEntry **deref = dictFindEntryRefByPtrAndHash(d, oldkey, hash); - if (deref) { - dictEntry *de = *deref; - dictEntry *newde = activeDefragAlloc(de); - if (newde) { - de = *deref = newde; - (*defragged)++; - } - if (newkey) - dictSetKey(d, de, newkey); - return de; - } - return NULL; -} - long activeDefragQuickListNode(quicklist *ql, quicklistNode **node_ref) { quicklistNode *newnode, *node = *node_ref; long defragged = 0; @@ -453,7 +426,7 @@ long scanLaterZset(robj *ob, unsigned long *cursor) { zset *zs = (zset*)ob->ptr; dict *d = zs->dict; scanLaterZsetData data = {zs, 0}; - *cursor = dictScan(d, *cursor, scanLaterZsetCallback, defragDictBucketCallback, &data); + *cursor = dictScanDefrag(d, *cursor, scanLaterZsetCallback, activeDefragAlloc, &data); return data.defragged; } @@ -476,7 +449,7 @@ long scanLaterSet(robj *ob, unsigned long *cursor) { return 0; dict *d = ob->ptr; scanLaterDictData data = {d, 0}; - *cursor = dictScan(d, *cursor, scanLaterSetCallback, defragDictBucketCallback, &data); + *cursor = dictScanDefrag(d, *cursor, scanLaterSetCallback, activeDefragAlloc, &data); return data.defragged; } @@ -497,7 +470,7 @@ long scanLaterHash(robj *ob, unsigned long *cursor) { return 0; dict *d = ob->ptr; scanLaterDictData data = {d, 0}; - *cursor = dictScan(d, *cursor, scanLaterHashCallback, defragDictBucketCallback, &data); + *cursor = dictScanDefrag(d, *cursor, scanLaterHashCallback, activeDefragAlloc, &data); return data.defragged; } @@ -777,14 +750,17 @@ long defragKey(redisDb *db, dictEntry *de) { /* Try to defrag the key name. */ newsds = activeDefragSds(keysds); - if (newsds) - defragged++, dictSetKey(db->dict, de, newsds); - if (dictSize(db->expires)) { - /* Dirty code: - * I can't search in db->expires for that key after i already released - * the pointer it holds it won't be able to do the string compare */ - uint64_t hash = dictGetHash(db->dict, dictGetKey(de)); - replaceSatelliteDictKeyPtrAndOrDefragDictEntry(db->expires, keysds, newsds, hash, &defragged); + if (newsds) { + defragged++; + dictSetKey(db->dict, de, newsds); + if (dictSize(db->expires)) { + /* We can't search in db->expires for that key after we've released + * the pointer it holds, since it won't be able to do the string + * compare, but we can find the entry using key hash and pointer. */ + uint64_t hash = dictGetHash(db->dict, newsds); + dictEntry *expire_de = dictFindEntryByPtrAndHash(db->expires, keysds, hash); + if (expire_de) dictSetKey(db->expires, expire_de, newsds); + } } /* Try to defrag robj and / or string value. */ @@ -856,20 +832,12 @@ void defragScanCallback(void *privdata, const dictEntry *de) { server.stat_active_defrag_scanned++; } -/* Defrag scan callback for each hash table bucket, - * used in order to defrag the dictEntry allocations. */ -void defragDictBucketCallback(dict *d, dictEntry **bucketref) { - while (bucketref && *bucketref) { - dictEntry *de = *bucketref, *newde; - if ((newde = activeDefragAlloc(de))) { - *bucketref = newde; - if (server.cluster_enabled && d == server.db[0].dict) { - /* Cluster keyspace dict. Update slot-to-entries mapping. */ - slotToKeyReplaceEntry(newde, server.db); - } - } - bucketref = dictGetNextRef(*bucketref); - } +/* Dummy scan callback used when defragging the expire dictionary. We only + * defrag the entries, which is done per bucket. */ +void defragExpireScanCallback(void *privdata, const dictEntry *de) { + UNUSED(privdata); + UNUSED(de); + server.stat_active_defrag_scanned++; } /* Utility function to get the fragmentation ratio from jemalloc. @@ -1037,6 +1005,7 @@ void computeDefragCycles() { void activeDefragCycle(void) { static int current_db = -1; static unsigned long cursor = 0; + static unsigned long expires_cursor = 0; static redisDb *db = NULL; static long long start_scan, start_stat; unsigned int iterations = 0; @@ -1082,7 +1051,7 @@ void activeDefragCycle(void) { do { /* if we're not continuing a scan from the last call or loop, start a new one */ - if (!cursor) { + if (!cursor && !expires_cursor) { /* finish any leftovers from previous db before moving to the next one */ if (db && defragLaterStep(db, endtime)) { quit = 1; /* time is up, we didn't finish all the work */ @@ -1129,16 +1098,32 @@ void activeDefragCycle(void) { break; /* this will exit the function and we'll continue on the next cycle */ } - cursor = dictScan(db->dict, cursor, defragScanCallback, defragDictBucketCallback, db); + /* Scan the keyspace dict unless we're scanning the expire dict. */ + if (!expires_cursor) + cursor = dictScanDefrag(db->dict, + cursor, + defragScanCallback, + activeDefragAlloc, + db); + + /* When done scanning the keyspace dict, we scan the expire dict. */ + if (!cursor) + expires_cursor = dictScanDefrag(db->expires, + expires_cursor, + defragExpireScanCallback, + activeDefragAlloc, + NULL); /* Once in 16 scan iterations, 512 pointer reallocations. or 64 keys * (if we have a lot of pointers in one hash bucket or rehashing), * check if we reached the time limit. * But regardless, don't start a new db in this loop, this is because after * the last db we call defragOtherGlobals, which must be done in one cycle */ - if (!cursor || (++iterations > 16 || - server.stat_active_defrag_hits - prev_defragged > 512 || - server.stat_active_defrag_scanned - prev_scanned > 64)) { + if (!(cursor || expires_cursor) || + ++iterations > 16 || + server.stat_active_defrag_hits - prev_defragged > 512 || + server.stat_active_defrag_scanned - prev_scanned > 64) + { if (!cursor || ustime() > endtime) { quit = 1; break; @@ -1147,7 +1132,7 @@ void activeDefragCycle(void) { prev_defragged = server.stat_active_defrag_hits; prev_scanned = server.stat_active_defrag_scanned; } - } while(cursor && !quit); + } while((cursor || expires_cursor) && !quit); } while(!quit); latencyEndMonitor(latency); diff --git a/src/dict.c b/src/dict.c index 193f619b7..ec2d4dca0 100644 --- a/src/dict.c +++ b/src/dict.c @@ -120,7 +120,11 @@ static void _dictReset(dict *d, int htidx) /* Create a new hash table */ dict *dictCreate(dictType *type) { - dict *d = zmalloc(sizeof(*d)); + size_t metasize = type->dictMetadataBytes ? type->dictMetadataBytes() : 0; + dict *d = zmalloc(sizeof(*d) + metasize); + if (metasize) { + memset(dictMetadata(d), 0, metasize); + } _dictInit(d,type); return d; @@ -319,6 +323,11 @@ static void _dictRehashStep(dict *d) { if (d->pauserehash == 0) dictRehash(d,1); } +/* Return a pointer to the metadata section within the dict. */ +void *dictMetadata(dict *d) { + return &d->metadata; +} + /* Add an element to the target hash table */ int dictAdd(dict *d, void *key, void *val) { @@ -365,10 +374,10 @@ dictEntry *dictAddRaw(dict *d, void *key, dictEntry **existing) * system it is more likely that recently added entries are accessed * more frequently. */ htidx = dictIsRehashing(d) ? 1 : 0; - size_t metasize = dictMetadataSize(d); + size_t metasize = dictEntryMetadataSize(d); entry = zmalloc(sizeof(*entry) + metasize); if (metasize > 0) { - memset(dictMetadata(entry), 0, metasize); + memset(dictEntryMetadata(entry), 0, metasize); } entry->next = d->ht_table[htidx][index]; d->ht_table[htidx][index] = entry; @@ -648,7 +657,7 @@ double dictIncrDoubleVal(dictEntry *de, double val) { } /* A pointer to the metadata section within the dict entry. */ -void *dictMetadata(dictEntry *de) { +void *dictEntryMetadata(dictEntry *de) { return &de->metadata; } @@ -677,17 +686,6 @@ double *dictGetDoubleValPtr(dictEntry *de) { return &de->v.d; } -/* The next entry in same hash bucket. */ -dictEntry *dictGetNext(const dictEntry *de) { - return de->next; -} - -/* A pointer to the 'next' field within the entry, or NULL if there is no next - * field. */ -dictEntry **dictGetNextRef(dictEntry *de) { - return &de->next; -} - /* Returns the memory usage in bytes of the dict, excluding the size of the keys * and values. */ size_t dictMemUsage(const dict *d) { @@ -949,6 +947,21 @@ unsigned int dictGetSomeKeys(dict *d, dictEntry **des, unsigned int count) { return stored; } + +/* Reallocate the dictEntry allocations in a bucket using the provided + * allocation function in order to defrag them. */ +static void dictDefragBucket(dict *d, dictEntry **bucketref, dictDefragAllocFunction *allocfn) { + while (bucketref && *bucketref) { + dictEntry *de = *bucketref, *newde; + if ((newde = allocfn(de))) { + *bucketref = newde; + if (d->type->afterReplaceEntry) + d->type->afterReplaceEntry(d, newde); + } + bucketref = &(*bucketref)->next; + } +} + /* This is like dictGetRandomKey() from the POV of the API, but will do more * work to ensure a better distribution of the returned element. * @@ -1072,9 +1085,25 @@ static unsigned long rev(unsigned long v) { unsigned long dictScan(dict *d, unsigned long v, dictScanFunction *fn, - dictScanBucketFunction* bucketfn, void *privdata) { + return dictScanDefrag(d, v, fn, NULL, privdata); +} + +/* Like dictScan, but additionally reallocates the memory used by the dict + * entries using the provided allocation function. This feature was added for + * the active defrag feature. + * + * The 'defracallocfn' callback is called with a pointer to memory that callback + * can reallocate. The callback should return a new memory address or NULL, + * where NULL means that no reallocation happened and the old memory is still + * valid. */ +unsigned long dictScanDefrag(dict *d, + unsigned long v, + dictScanFunction *fn, + dictDefragAllocFunction *defragallocfn, + void *privdata) +{ int htidx0, htidx1; const dictEntry *de, *next; unsigned long m0, m1; @@ -1089,7 +1118,9 @@ unsigned long dictScan(dict *d, m0 = DICTHT_SIZE_MASK(d->ht_size_exp[htidx0]); /* Emit entries at cursor */ - if (bucketfn) bucketfn(d, &d->ht_table[htidx0][v & m0]); + if (defragallocfn) { + dictDefragBucket(d, &d->ht_table[htidx0][v & m0], defragallocfn); + } de = d->ht_table[htidx0][v & m0]; while (de) { next = de->next; @@ -1120,7 +1151,9 @@ unsigned long dictScan(dict *d, m1 = DICTHT_SIZE_MASK(d->ht_size_exp[htidx1]); /* Emit entries at cursor */ - if (bucketfn) bucketfn(d, &d->ht_table[htidx0][v & m0]); + if (defragallocfn) { + dictDefragBucket(d, &d->ht_table[htidx0][v & m0], defragallocfn); + } de = d->ht_table[htidx0][v & m0]; while (de) { next = de->next; @@ -1132,7 +1165,9 @@ unsigned long dictScan(dict *d, * of the index pointed to by the cursor in the smaller table */ do { /* Emit entries at cursor */ - if (bucketfn) bucketfn(d, &d->ht_table[htidx1][v & m1]); + if (defragallocfn) { + dictDefragBucket(d, &d->ht_table[htidx1][v & m1], defragallocfn); + } de = d->ht_table[htidx1][v & m1]; while (de) { next = de->next; @@ -1253,25 +1288,23 @@ uint64_t dictGetHash(dict *d, const void *key) { return dictHashKey(d, key); } -/* Finds the dictEntry reference by using pointer and pre-calculated hash. +/* Finds the dictEntry using pointer and pre-calculated hash. * oldkey is a dead pointer and should not be accessed. * the hash value should be provided using dictGetHash. * no string / key comparison is performed. - * return value is the reference to the dictEntry if found, or NULL if not found. */ -dictEntry **dictFindEntryRefByPtrAndHash(dict *d, const void *oldptr, uint64_t hash) { - dictEntry *he, **heref; + * return value is a pointer to the dictEntry if found, or NULL if not found. */ +dictEntry *dictFindEntryByPtrAndHash(dict *d, const void *oldptr, uint64_t hash) { + dictEntry *he; unsigned long idx, table; if (dictSize(d) == 0) return NULL; /* dict is empty */ for (table = 0; table <= 1; table++) { idx = hash & DICTHT_SIZE_MASK(d->ht_size_exp[table]); - heref = &d->ht_table[table][idx]; - he = *heref; + he = d->ht_table[table][idx]; while(he) { if (oldptr==he->key) - return heref; - heref = &he->next; - he = *heref; + return he; + he = he->next; } if (!dictIsRehashing(d)) return NULL; } diff --git a/src/dict.h b/src/dict.h index 99ff3b769..a8a76e69c 100644 --- a/src/dict.h +++ b/src/dict.h @@ -56,9 +56,13 @@ typedef struct dictType { void (*keyDestructor)(dict *d, void *key); void (*valDestructor)(dict *d, void *obj); int (*expandAllowed)(size_t moreMem, double usedRatio); - /* Allow a dictEntry to carry extra caller-defined metadata. The - * extra memory is initialized to 0 when a dictEntry is allocated. */ + /* Allow each dict and dictEntry to carry extra caller-defined metadata. The + * extra memory is initialized to 0 when allocated. */ size_t (*dictEntryMetadataBytes)(dict *d); + size_t (*dictMetadataBytes)(void); + /* Optional callback called after an entry has been reallocated (due to + * active defrag). Only called if the entry has metadata. */ + void (*afterReplaceEntry)(dict *d, dictEntry *entry); } dictType; #define DICTHT_SIZE(exp) ((exp) == -1 ? 0 : (unsigned long)1<<(exp)) @@ -75,6 +79,10 @@ struct dict { /* Keep small vars at end for optimal (minimal) struct padding */ int16_t pauserehash; /* If >0 rehashing is paused (<0 indicates coding error) */ signed char ht_size_exp[2]; /* exponent of size. (size = 1<<exp) */ + + void *metadata[]; /* An arbitrary number of bytes (starting at a + * pointer-aligned address) of size as defined + * by dictType's dictEntryBytes. */ }; /* If safe is set to 1 this is a safe iterator, that means, you can call @@ -91,7 +99,7 @@ typedef struct dictIterator { } dictIterator; typedef void (dictScanFunction)(void *privdata, const dictEntry *de); -typedef void (dictScanBucketFunction)(dict *d, dictEntry **bucketref); +typedef void *(dictDefragAllocFunction)(void *ptr); /* This is the initial size of every hash table */ #define DICT_HT_INITIAL_EXP 2 @@ -112,8 +120,10 @@ typedef void (dictScanBucketFunction)(dict *d, dictEntry **bucketref); (d)->type->keyCompare((d), key1, key2) : \ (key1) == (key2)) -#define dictMetadataSize(d) ((d)->type->dictEntryMetadataBytes \ - ? (d)->type->dictEntryMetadataBytes(d) : 0) +#define dictEntryMetadataSize(d) ((d)->type->dictEntryMetadataBytes \ + ? (d)->type->dictEntryMetadataBytes(d) : 0) +#define dictMetadataSize(d) ((d)->type->dictMetadataBytes \ + ? (d)->type->dictMetadataBytes() : 0) #define dictHashKey(d, key) ((d)->type->hashFunction(key)) #define dictSlots(d) (DICTHT_SIZE((d)->ht_size_exp[0])+DICTHT_SIZE((d)->ht_size_exp[1])) @@ -139,6 +149,7 @@ typedef enum { dict *dictCreate(dictType *type); int dictExpand(dict *d, unsigned long size); int dictTryExpand(dict *d, unsigned long size); +void *dictMetadata(dict *d); int dictAdd(dict *d, void *key, void *val); dictEntry *dictAddRaw(dict *d, void *key, dictEntry **existing); dictEntry *dictAddOrFind(dict *d, void *key); @@ -160,15 +171,13 @@ void dictSetDoubleVal(dictEntry *de, double val); int64_t dictIncrSignedIntegerVal(dictEntry *de, int64_t val); uint64_t dictIncrUnsignedIntegerVal(dictEntry *de, uint64_t val); double dictIncrDoubleVal(dictEntry *de, double val); -void *dictMetadata(dictEntry *de); +void *dictEntryMetadata(dictEntry *de); void *dictGetKey(const dictEntry *de); void *dictGetVal(const dictEntry *de); int64_t dictGetSignedIntegerVal(const dictEntry *de); uint64_t dictGetUnsignedIntegerVal(const dictEntry *de); double dictGetDoubleVal(const dictEntry *de); double *dictGetDoubleValPtr(dictEntry *de); -dictEntry *dictGetNext(const dictEntry *de); -dictEntry **dictGetNextRef(dictEntry *de); size_t dictMemUsage(const dict *d); size_t dictEntryMemUsage(void); dictIterator *dictGetIterator(dict *d); @@ -190,9 +199,10 @@ int dictRehash(dict *d, int n); int dictRehashMilliseconds(dict *d, int ms); void dictSetHashFunctionSeed(uint8_t *seed); uint8_t *dictGetHashFunctionSeed(void); -unsigned long dictScan(dict *d, unsigned long v, dictScanFunction *fn, dictScanBucketFunction *bucketfn, void *privdata); +unsigned long dictScan(dict *d, unsigned long v, dictScanFunction *fn, void *privdata); +unsigned long dictScanDefrag(dict *d, unsigned long v, dictScanFunction *fn, dictDefragAllocFunction *allocfn, void *privdata); uint64_t dictGetHash(dict *d, const void *key); -dictEntry **dictFindEntryRefByPtrAndHash(dict *d, const void *oldptr, uint64_t hash); +dictEntry *dictFindEntryByPtrAndHash(dict *d, const void *oldptr, uint64_t hash); #ifdef REDIS_TEST int dictTest(int argc, char *argv[], int flags); diff --git a/src/expire.c b/src/expire.c index e357b9986..d07e22f66 100644 --- a/src/expire.c +++ b/src/expire.c @@ -271,7 +271,7 @@ void activeExpireCycle(int type) { while (data.sampled < num && checked_buckets < max_buckets) { db->expires_cursor = dictScan(db->expires, db->expires_cursor, - expireScanCallback, NULL, &data); + expireScanCallback, &data); checked_buckets++; } total_expired += data.expired; diff --git a/src/module.c b/src/module.c index cc6934dca..b745e8ee4 100644 --- a/src/module.c +++ b/src/module.c @@ -10336,7 +10336,7 @@ int RM_Scan(RedisModuleCtx *ctx, RedisModuleScanCursor *cursor, RedisModuleScanC } int ret = 1; ScanCBData data = { ctx, privdata, fn }; - cursor->cursor = dictScan(ctx->client->db->dict, cursor->cursor, moduleScanCallback, NULL, &data); + cursor->cursor = dictScan(ctx->client->db->dict, cursor->cursor, moduleScanCallback, &data); if (cursor->cursor == 0) { cursor->done = 1; ret = 0; @@ -10448,7 +10448,7 @@ int RM_ScanKey(RedisModuleKey *key, RedisModuleScanCursor *cursor, RedisModuleSc int ret = 1; if (ht) { ScanKeyCBData data = { key, privdata, fn }; - cursor->cursor = dictScan(ht, cursor->cursor, moduleScanKeyCallback, NULL, &data); + cursor->cursor = dictScan(ht, cursor->cursor, moduleScanKeyCallback, &data); if (cursor->cursor == 0) { cursor->done = 1; ret = 0; diff --git a/src/object.c b/src/object.c index ca9a24088..67e372032 100644 --- a/src/object.c +++ b/src/object.c @@ -1252,7 +1252,8 @@ struct redisMemOverhead *getMemoryOverheadData(void) { mem_total+=mem; /* Account for the slot to keys map in cluster mode */ - mem = dictSize(db->dict) * dictMetadataSize(db->dict); + mem = dictSize(db->dict) * dictEntryMetadataSize(db->dict) + + dictMetadataSize(db->dict); mh->db[mh->num_dbs].overhead_ht_slot_to_keys = mem; mem_total+=mem; diff --git a/src/server.c b/src/server.c index ae83ec25c..910f299f3 100644 --- a/src/server.c +++ b/src/server.c @@ -399,13 +399,24 @@ int dictExpandAllowed(size_t moreMem, double usedRatio) { /* Returns the size of the DB dict entry metadata in bytes. In cluster mode, the * metadata is used for constructing a doubly linked list of the dict entries * belonging to the same cluster slot. See the Slot to Key API in cluster.c. */ -size_t dictEntryMetadataSize(dict *d) { +size_t dbDictEntryMetadataSize(dict *d) { UNUSED(d); /* NOTICE: this also affects overhead_ht_slot_to_keys in getMemoryOverheadData. * If we ever add non-cluster related data here, that code must be modified too. */ return server.cluster_enabled ? sizeof(clusterDictEntryMetadata) : 0; } +/* Returns the size of the DB dict metadata in bytes. In cluster mode, we store + * a pointer to the db in the main db dict, used for updating the slot-to-key + * mapping when a dictEntry is reallocated. */ +size_t dbDictMetadataSize(void) { + return server.cluster_enabled ? sizeof(clusterDictMetadata) : 0; +} + +void dbDictAfterReplaceEntry(dict *d, dictEntry *de) { + if (server.cluster_enabled) slotToKeyReplaceEntry(d, de); +} + /* Generic hash table type where keys are Redis Objects, Values * dummy pointers. */ dictType objectKeyPointerValueDictType = { @@ -460,7 +471,9 @@ dictType dbDictType = { dictSdsDestructor, /* key destructor */ dictObjectDestructor, /* val destructor */ dictExpandAllowed, /* allow to expand */ - dictEntryMetadataSize /* size of entry metadata in bytes */ + dbDictEntryMetadataSize, /* size of entry metadata in bytes */ + dbDictMetadataSize, /* size of dict metadata in bytes */ + dbDictAfterReplaceEntry /* notify entry moved/reallocated */ }; /* Db->expires */ |