diff options
-rw-r--r-- | src/server.c | 4 | ||||
-rw-r--r-- | src/server.h | 2 | ||||
-rw-r--r-- | src/tracking.c | 192 |
3 files changed, 84 insertions, 114 deletions
diff --git a/src/server.c b/src/server.c index f87dfba46..df6a8c1f4 100644 --- a/src/server.c +++ b/src/server.c @@ -4221,7 +4221,7 @@ sds genRedisInfoString(const char *section) { "active_defrag_misses:%lld\r\n" "active_defrag_key_hits:%lld\r\n" "active_defrag_key_misses:%lld\r\n" - "tracking_used_slots:%lld\r\n", + "tracking_tracked_keys:%lld\r\n", server.stat_numconnections, server.stat_numcommands, getInstantaneousMetric(STATS_METRIC_COMMAND), @@ -4249,7 +4249,7 @@ sds genRedisInfoString(const char *section) { server.stat_active_defrag_misses, server.stat_active_defrag_key_hits, server.stat_active_defrag_key_misses, - trackingGetUsedSlots()); + (unsigned long long) trackingGetTotalItems()); } /* Replication */ diff --git a/src/server.h b/src/server.h index dfbc15ac3..a30db6b57 100644 --- a/src/server.h +++ b/src/server.h @@ -1654,7 +1654,7 @@ void trackingRememberKeys(client *c); void trackingInvalidateKey(robj *keyobj); void trackingInvalidateKeysOnFlush(int dbid); void trackingLimitUsedSlots(void); -unsigned long long trackingGetUsedSlots(void); +uint64_t trackingGetTotalItems(void); /* List data type */ void listTypeTryConversion(robj *subject, robj *value); diff --git a/src/tracking.c b/src/tracking.c index acb97800a..ecb7fbdcc 100644 --- a/src/tracking.c +++ b/src/tracking.c @@ -30,37 +30,22 @@ #include "server.h" -/* The tracking table is constituted by 2^24 radix trees (each tree, and the - * table itself, are allocated in a lazy way only when needed) tracking - * clients that may have certain keys in their local, client side, cache. - * - * Keys are grouped into 2^24 slots, in a way similar to Redis Cluster hash - * slots, however here the function we use is crc64, taking the least - * significant 24 bits of the output. +/* The tracking table is constituted by a radix tree of keys, each pointing + * to a radix tree of client IDs, used to track the clients that may have + * certain keys in their local, client side, cache. * * When a client enables tracking with "CLIENT TRACKING on", each key served to - * the client is hashed to one of such slots, and Redis will remember what - * client may have keys about such slot. Later, when a key in a given slot is - * modified, all the clients that may have local copies of keys in that slot - * will receive an invalidation message. There is no distinction of database - * number: a single table is used. + * the client is remembered in the table mapping the keys to the client IDs. + * Later, when a key is modified, all the clients that may have local copy + * of such key will receive an invalidation message. * * Clients will normally take frequently requested objects in memory, removing - * them when invalidation messages are received. A strategy clients may use is - * to just cache objects in a dictionary, associating to each cached object - * some incremental epoch, or just a timestamp. When invalidation messages are - * received clients may store, in a different table, the timestamp (or epoch) - * of the invalidation of such given slot: later when accessing objects, the - * eviction of stale objects may be performed in a lazy way by checking if the - * cached object timestamp is older than the invalidation timestamp for such - * objects. - * - * The output of the 24 bit hash function is very large (more than 16 million - * possible slots), so clients that may want to use less resources may only - * use the most significant bits instead of the full 24 bits. */ -#define TRACKING_TABLE_SIZE (1<<24) -rax **TrackingTable = NULL; -unsigned long TrackingTableUsedSlots = 0; + * them when invalidation messages are received. */ +rax *TrackingTable = NULL; +uint64_t TrackingTableTotalItems = 0; /* Total number of IDs stored across + the whole tracking table. This givesn + an hint about the total memory we + are using server side for CSC. */ robj *TrackingChannelName; /* Remove the tracking state from the client 'c'. Note that there is not much @@ -90,7 +75,7 @@ void enableTracking(client *c, uint64_t redirect_to) { c->client_tracking_redirection = redirect_to; server.tracking_clients++; if (TrackingTable == NULL) { - TrackingTable = zcalloc(sizeof(rax*) * TRACKING_TABLE_SIZE); + TrackingTable = raxNew(); TrackingChannelName = createStringObject("__redis__:invalidate",20); } } @@ -108,19 +93,20 @@ void trackingRememberKeys(client *c) { for(int j = 0; j < numkeys; j++) { int idx = keys[j]; sds sdskey = c->argv[idx]->ptr; - uint64_t hash = crc64(0, - (unsigned char*)sdskey,sdslen(sdskey))&(TRACKING_TABLE_SIZE-1); - if (TrackingTable[hash] == NULL) { - TrackingTable[hash] = raxNew(); - TrackingTableUsedSlots++; + rax *ids = raxFind(TrackingTable,(unsigned char*)sdskey,sdslen(sdskey)); + if (ids == raxNotFound) { + ids = raxNew(); + int inserted = raxTryInsert(TrackingTable,(unsigned char*)sdskey, + sdslen(sdskey),ids, NULL); + serverAssert(inserted == 1); } - raxTryInsert(TrackingTable[hash], - (unsigned char*)&c->id,sizeof(c->id),NULL,NULL); + if (raxTryInsert(ids,(unsigned char*)&c->id,sizeof(c->id),NULL,NULL)) + TrackingTableTotalItems++; } getKeysFreeResult(keys); } -void sendTrackingMessage(client *c, long long hash) { +void sendTrackingMessage(client *c, char *keyname, size_t keylen) { int using_redirection = 0; if (c->client_tracking_redirection) { client *redir = lookupClientByID(c->client_tracking_redirection); @@ -146,49 +132,44 @@ void sendTrackingMessage(client *c, long long hash) { if (c->resp > 2) { addReplyPushLen(c,2); addReplyBulkCBuffer(c,"invalidate",10); - addReplyLongLong(c,hash); + addReplyBulkCBuffer(c,keyname,keylen); } else if (using_redirection && c->flags & CLIENT_PUBSUB) { - robj *msg = createStringObjectFromLongLong(hash); - addReplyPubsubMessage(c,TrackingChannelName,msg); - decrRefCount(msg); + /* We use a static object to speedup things, however we assume + * that addReplyPubsubMessage() will not take a reference. */ + robj keyobj; + initStaticStringObject(keyobj,keyname); + addReplyPubsubMessage(c,TrackingChannelName,&keyobj); + serverAssert(keyobj.refcount == 1); } } -/* Invalidates a caching slot: this is actually the low level implementation - * of the API that Redis calls externally, that is trackingInvalidateKey(). */ -void trackingInvalidateSlot(uint64_t slot) { - if (TrackingTable == NULL || TrackingTable[slot] == NULL) return; +/* This function is called from signalModifiedKey() or other places in Redis + * when a key changes value. In the context of keys tracking, our task here is + * to send a notification to every client that may have keys about such caching + * slot. */ +void trackingInvalidateKey(robj *keyobj) { + if (TrackingTable == NULL) return; + sds sdskey = keyobj->ptr; + rax *ids = raxFind(TrackingTable,(unsigned char*)sdskey,sdslen(sdskey)); + if (ids == raxNotFound) return;; raxIterator ri; - raxStart(&ri,TrackingTable[slot]); + raxStart(&ri,ids); raxSeek(&ri,"^",NULL,0); while(raxNext(&ri)) { uint64_t id; memcpy(&id,ri.key,sizeof(id)); client *c = lookupClientByID(id); if (c == NULL || !(c->flags & CLIENT_TRACKING)) continue; - sendTrackingMessage(c,slot); + sendTrackingMessage(c,sdskey,sdslen(sdskey)); } raxStop(&ri); /* Free the tracking table: we'll create the radix tree and populate it * again if more keys will be modified in this caching slot. */ - raxFree(TrackingTable[slot]); - TrackingTable[slot] = NULL; - TrackingTableUsedSlots--; -} - -/* This function is called from signalModifiedKey() or other places in Redis - * when a key changes value. In the context of keys tracking, our task here is - * to send a notification to every client that may have keys about such caching - * slot. */ -void trackingInvalidateKey(robj *keyobj) { - if (TrackingTable == NULL || TrackingTableUsedSlots == 0) return; - - sds sdskey = keyobj->ptr; - uint64_t hash = crc64(0, - (unsigned char*)sdskey,sdslen(sdskey))&(TRACKING_TABLE_SIZE-1); - trackingInvalidateSlot(hash); + TrackingTableTotalItems -= raxSize(ids); + raxFree(ids); + raxRemove(TrackingTable,(unsigned char*)sdskey,sdslen(sdskey),NULL); } /* This function is called when one or all the Redis databases are flushed @@ -205,6 +186,10 @@ void trackingInvalidateKey(robj *keyobj) { * we just send the invalidation message to all the clients, but don't * flush the table: it will slowly get garbage collected as more keys * are modified in the used caching slots. */ +void freeTrackingRadixTree(void *rt) { + raxFree(rt); +} + void trackingInvalidateKeysOnFlush(int dbid) { if (server.tracking_clients) { listNode *ln; @@ -213,84 +198,69 @@ void trackingInvalidateKeysOnFlush(int dbid) { while ((ln = listNext(&li)) != NULL) { client *c = listNodeValue(ln); if (c->flags & CLIENT_TRACKING) { - sendTrackingMessage(c,-1); + sendTrackingMessage(c,"",1); } } } /* In case of FLUSHALL, reclaim all the memory used by tracking. */ if (dbid == -1 && TrackingTable) { - for (int j = 0; j < TRACKING_TABLE_SIZE && TrackingTableUsedSlots > 0; j++) { - if (TrackingTable[j] != NULL) { - raxFree(TrackingTable[j]); - TrackingTable[j] = NULL; - TrackingTableUsedSlots--; - } - } - - /* If there are no clients with tracking enabled, we can even - * reclaim the memory used by the table itself. The code assumes - * the table is allocated only if there is at least one client alive - * with tracking enabled. */ - if (server.tracking_clients == 0) { - zfree(TrackingTable); - TrackingTable = NULL; - } + raxFreeWithCallback(TrackingTable,freeTrackingRadixTree); + TrackingTableTotalItems = 0; } } /* Tracking forces Redis to remember information about which client may have - * keys about certian caching slots. In workloads where there are a lot of - * reads, but keys are hardly modified, the amount of information we have - * to remember server side could be a lot: for each 16 millions of caching - * slots we may end with a radix tree containing many entries. + * certain keys. In workloads where there are a lot of reads, but keys are + * hardly modified, the amount of information we have to remember server side + * could be a lot, with the number of keys being totally not bound. * - * So Redis allows the user to configure a maximum fill rate for the + * So Redis allows the user to configure a maximum number of keys for the * invalidation table. This function makes sure that we don't go over the * specified fill rate: if we are over, we can just evict informations about - * random caching slots, and send invalidation messages to clients like if - * the key was modified. */ + * a random key, and send invalidation messages to clients like if the key was + * modified. */ void trackingLimitUsedSlots(void) { static unsigned int timeout_counter = 0; - + if (TrackingTable == NULL) return; if (server.tracking_table_max_fill == 0) return; /* No limits set. */ - unsigned int max_slots = - (TRACKING_TABLE_SIZE/100) * server.tracking_table_max_fill; - if (TrackingTableUsedSlots <= max_slots) { + size_t max_keys = server.tracking_table_max_fill; + if (raxSize(TrackingTable) <= max_keys) { timeout_counter = 0; return; /* Limit not reached. */ } - /* We have to invalidate a few slots to reach the limit again. The effort + /* We have to invalidate a few keys to reach the limit again. The effort * we do here is proportional to the number of times we entered this * function and found that we are still over the limit. */ int effort = 100 * (timeout_counter+1); - /* Let's start at a random position, and perform linear probing, in order - * to improve cache locality. However once we are able to find an used - * slot, jump again randomly, in order to avoid creating big holes in the - * table (that will make this funciton use more resourced later). */ + /* We just remove one key after another by using a random walk. */ + raxIterator ri; + raxStart(&ri,TrackingTable); while(effort > 0) { - unsigned int idx = rand() % TRACKING_TABLE_SIZE; - do { - effort--; - idx = (idx+1) % TRACKING_TABLE_SIZE; - if (TrackingTable[idx] != NULL) { - trackingInvalidateSlot(idx); - if (TrackingTableUsedSlots <= max_slots) { - timeout_counter = 0; - return; /* Return ASAP: we are again under the limit. */ - } else { - break; /* Jump to next random position. */ - } - } - } while(effort > 0); + effort--; + raxSeek(&ri,"^",NULL,0); + raxRandomWalk(&ri,0); + rax *ids = ri.data; + TrackingTableTotalItems -= raxSize(ids); + raxFree(ids); + raxRemove(TrackingTable,ri.key,ri.key_len,NULL); + if (raxSize(TrackingTable) <= max_keys) { + timeout_counter = 0; + raxStop(&ri); + return; /* Return ASAP: we are again under the limit. */ + } } + + /* If we reach this point, we were not able to go under the configured + * limit using the maximum effort we had for this run. */ + raxStop(&ri); timeout_counter++; } /* This is just used in order to access the amount of used slots in the * tracking table. */ -unsigned long long trackingGetUsedSlots(void) { - return TrackingTableUsedSlots; +uint64_t trackingGetTotalItems(void) { + return TrackingTableTotalItems; } |