summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/server.c4
-rw-r--r--src/server.h2
-rw-r--r--src/tracking.c192
3 files changed, 84 insertions, 114 deletions
diff --git a/src/server.c b/src/server.c
index f87dfba46..df6a8c1f4 100644
--- a/src/server.c
+++ b/src/server.c
@@ -4221,7 +4221,7 @@ sds genRedisInfoString(const char *section) {
"active_defrag_misses:%lld\r\n"
"active_defrag_key_hits:%lld\r\n"
"active_defrag_key_misses:%lld\r\n"
- "tracking_used_slots:%lld\r\n",
+ "tracking_tracked_keys:%lld\r\n",
server.stat_numconnections,
server.stat_numcommands,
getInstantaneousMetric(STATS_METRIC_COMMAND),
@@ -4249,7 +4249,7 @@ sds genRedisInfoString(const char *section) {
server.stat_active_defrag_misses,
server.stat_active_defrag_key_hits,
server.stat_active_defrag_key_misses,
- trackingGetUsedSlots());
+ (unsigned long long) trackingGetTotalItems());
}
/* Replication */
diff --git a/src/server.h b/src/server.h
index dfbc15ac3..a30db6b57 100644
--- a/src/server.h
+++ b/src/server.h
@@ -1654,7 +1654,7 @@ void trackingRememberKeys(client *c);
void trackingInvalidateKey(robj *keyobj);
void trackingInvalidateKeysOnFlush(int dbid);
void trackingLimitUsedSlots(void);
-unsigned long long trackingGetUsedSlots(void);
+uint64_t trackingGetTotalItems(void);
/* List data type */
void listTypeTryConversion(robj *subject, robj *value);
diff --git a/src/tracking.c b/src/tracking.c
index acb97800a..ecb7fbdcc 100644
--- a/src/tracking.c
+++ b/src/tracking.c
@@ -30,37 +30,22 @@
#include "server.h"
-/* The tracking table is constituted by 2^24 radix trees (each tree, and the
- * table itself, are allocated in a lazy way only when needed) tracking
- * clients that may have certain keys in their local, client side, cache.
- *
- * Keys are grouped into 2^24 slots, in a way similar to Redis Cluster hash
- * slots, however here the function we use is crc64, taking the least
- * significant 24 bits of the output.
+/* The tracking table is constituted by a radix tree of keys, each pointing
+ * to a radix tree of client IDs, used to track the clients that may have
+ * certain keys in their local, client side, cache.
*
* When a client enables tracking with "CLIENT TRACKING on", each key served to
- * the client is hashed to one of such slots, and Redis will remember what
- * client may have keys about such slot. Later, when a key in a given slot is
- * modified, all the clients that may have local copies of keys in that slot
- * will receive an invalidation message. There is no distinction of database
- * number: a single table is used.
+ * the client is remembered in the table mapping the keys to the client IDs.
+ * Later, when a key is modified, all the clients that may have local copy
+ * of such key will receive an invalidation message.
*
* Clients will normally take frequently requested objects in memory, removing
- * them when invalidation messages are received. A strategy clients may use is
- * to just cache objects in a dictionary, associating to each cached object
- * some incremental epoch, or just a timestamp. When invalidation messages are
- * received clients may store, in a different table, the timestamp (or epoch)
- * of the invalidation of such given slot: later when accessing objects, the
- * eviction of stale objects may be performed in a lazy way by checking if the
- * cached object timestamp is older than the invalidation timestamp for such
- * objects.
- *
- * The output of the 24 bit hash function is very large (more than 16 million
- * possible slots), so clients that may want to use less resources may only
- * use the most significant bits instead of the full 24 bits. */
-#define TRACKING_TABLE_SIZE (1<<24)
-rax **TrackingTable = NULL;
-unsigned long TrackingTableUsedSlots = 0;
+ * them when invalidation messages are received. */
+rax *TrackingTable = NULL;
+uint64_t TrackingTableTotalItems = 0; /* Total number of IDs stored across
+ the whole tracking table. This givesn
+ an hint about the total memory we
+ are using server side for CSC. */
robj *TrackingChannelName;
/* Remove the tracking state from the client 'c'. Note that there is not much
@@ -90,7 +75,7 @@ void enableTracking(client *c, uint64_t redirect_to) {
c->client_tracking_redirection = redirect_to;
server.tracking_clients++;
if (TrackingTable == NULL) {
- TrackingTable = zcalloc(sizeof(rax*) * TRACKING_TABLE_SIZE);
+ TrackingTable = raxNew();
TrackingChannelName = createStringObject("__redis__:invalidate",20);
}
}
@@ -108,19 +93,20 @@ void trackingRememberKeys(client *c) {
for(int j = 0; j < numkeys; j++) {
int idx = keys[j];
sds sdskey = c->argv[idx]->ptr;
- uint64_t hash = crc64(0,
- (unsigned char*)sdskey,sdslen(sdskey))&(TRACKING_TABLE_SIZE-1);
- if (TrackingTable[hash] == NULL) {
- TrackingTable[hash] = raxNew();
- TrackingTableUsedSlots++;
+ rax *ids = raxFind(TrackingTable,(unsigned char*)sdskey,sdslen(sdskey));
+ if (ids == raxNotFound) {
+ ids = raxNew();
+ int inserted = raxTryInsert(TrackingTable,(unsigned char*)sdskey,
+ sdslen(sdskey),ids, NULL);
+ serverAssert(inserted == 1);
}
- raxTryInsert(TrackingTable[hash],
- (unsigned char*)&c->id,sizeof(c->id),NULL,NULL);
+ if (raxTryInsert(ids,(unsigned char*)&c->id,sizeof(c->id),NULL,NULL))
+ TrackingTableTotalItems++;
}
getKeysFreeResult(keys);
}
-void sendTrackingMessage(client *c, long long hash) {
+void sendTrackingMessage(client *c, char *keyname, size_t keylen) {
int using_redirection = 0;
if (c->client_tracking_redirection) {
client *redir = lookupClientByID(c->client_tracking_redirection);
@@ -146,49 +132,44 @@ void sendTrackingMessage(client *c, long long hash) {
if (c->resp > 2) {
addReplyPushLen(c,2);
addReplyBulkCBuffer(c,"invalidate",10);
- addReplyLongLong(c,hash);
+ addReplyBulkCBuffer(c,keyname,keylen);
} else if (using_redirection && c->flags & CLIENT_PUBSUB) {
- robj *msg = createStringObjectFromLongLong(hash);
- addReplyPubsubMessage(c,TrackingChannelName,msg);
- decrRefCount(msg);
+ /* We use a static object to speedup things, however we assume
+ * that addReplyPubsubMessage() will not take a reference. */
+ robj keyobj;
+ initStaticStringObject(keyobj,keyname);
+ addReplyPubsubMessage(c,TrackingChannelName,&keyobj);
+ serverAssert(keyobj.refcount == 1);
}
}
-/* Invalidates a caching slot: this is actually the low level implementation
- * of the API that Redis calls externally, that is trackingInvalidateKey(). */
-void trackingInvalidateSlot(uint64_t slot) {
- if (TrackingTable == NULL || TrackingTable[slot] == NULL) return;
+/* This function is called from signalModifiedKey() or other places in Redis
+ * when a key changes value. In the context of keys tracking, our task here is
+ * to send a notification to every client that may have keys about such caching
+ * slot. */
+void trackingInvalidateKey(robj *keyobj) {
+ if (TrackingTable == NULL) return;
+ sds sdskey = keyobj->ptr;
+ rax *ids = raxFind(TrackingTable,(unsigned char*)sdskey,sdslen(sdskey));
+ if (ids == raxNotFound) return;;
raxIterator ri;
- raxStart(&ri,TrackingTable[slot]);
+ raxStart(&ri,ids);
raxSeek(&ri,"^",NULL,0);
while(raxNext(&ri)) {
uint64_t id;
memcpy(&id,ri.key,sizeof(id));
client *c = lookupClientByID(id);
if (c == NULL || !(c->flags & CLIENT_TRACKING)) continue;
- sendTrackingMessage(c,slot);
+ sendTrackingMessage(c,sdskey,sdslen(sdskey));
}
raxStop(&ri);
/* Free the tracking table: we'll create the radix tree and populate it
* again if more keys will be modified in this caching slot. */
- raxFree(TrackingTable[slot]);
- TrackingTable[slot] = NULL;
- TrackingTableUsedSlots--;
-}
-
-/* This function is called from signalModifiedKey() or other places in Redis
- * when a key changes value. In the context of keys tracking, our task here is
- * to send a notification to every client that may have keys about such caching
- * slot. */
-void trackingInvalidateKey(robj *keyobj) {
- if (TrackingTable == NULL || TrackingTableUsedSlots == 0) return;
-
- sds sdskey = keyobj->ptr;
- uint64_t hash = crc64(0,
- (unsigned char*)sdskey,sdslen(sdskey))&(TRACKING_TABLE_SIZE-1);
- trackingInvalidateSlot(hash);
+ TrackingTableTotalItems -= raxSize(ids);
+ raxFree(ids);
+ raxRemove(TrackingTable,(unsigned char*)sdskey,sdslen(sdskey),NULL);
}
/* This function is called when one or all the Redis databases are flushed
@@ -205,6 +186,10 @@ void trackingInvalidateKey(robj *keyobj) {
* we just send the invalidation message to all the clients, but don't
* flush the table: it will slowly get garbage collected as more keys
* are modified in the used caching slots. */
+void freeTrackingRadixTree(void *rt) {
+ raxFree(rt);
+}
+
void trackingInvalidateKeysOnFlush(int dbid) {
if (server.tracking_clients) {
listNode *ln;
@@ -213,84 +198,69 @@ void trackingInvalidateKeysOnFlush(int dbid) {
while ((ln = listNext(&li)) != NULL) {
client *c = listNodeValue(ln);
if (c->flags & CLIENT_TRACKING) {
- sendTrackingMessage(c,-1);
+ sendTrackingMessage(c,"",1);
}
}
}
/* In case of FLUSHALL, reclaim all the memory used by tracking. */
if (dbid == -1 && TrackingTable) {
- for (int j = 0; j < TRACKING_TABLE_SIZE && TrackingTableUsedSlots > 0; j++) {
- if (TrackingTable[j] != NULL) {
- raxFree(TrackingTable[j]);
- TrackingTable[j] = NULL;
- TrackingTableUsedSlots--;
- }
- }
-
- /* If there are no clients with tracking enabled, we can even
- * reclaim the memory used by the table itself. The code assumes
- * the table is allocated only if there is at least one client alive
- * with tracking enabled. */
- if (server.tracking_clients == 0) {
- zfree(TrackingTable);
- TrackingTable = NULL;
- }
+ raxFreeWithCallback(TrackingTable,freeTrackingRadixTree);
+ TrackingTableTotalItems = 0;
}
}
/* Tracking forces Redis to remember information about which client may have
- * keys about certian caching slots. In workloads where there are a lot of
- * reads, but keys are hardly modified, the amount of information we have
- * to remember server side could be a lot: for each 16 millions of caching
- * slots we may end with a radix tree containing many entries.
+ * certain keys. In workloads where there are a lot of reads, but keys are
+ * hardly modified, the amount of information we have to remember server side
+ * could be a lot, with the number of keys being totally not bound.
*
- * So Redis allows the user to configure a maximum fill rate for the
+ * So Redis allows the user to configure a maximum number of keys for the
* invalidation table. This function makes sure that we don't go over the
* specified fill rate: if we are over, we can just evict informations about
- * random caching slots, and send invalidation messages to clients like if
- * the key was modified. */
+ * a random key, and send invalidation messages to clients like if the key was
+ * modified. */
void trackingLimitUsedSlots(void) {
static unsigned int timeout_counter = 0;
-
+ if (TrackingTable == NULL) return;
if (server.tracking_table_max_fill == 0) return; /* No limits set. */
- unsigned int max_slots =
- (TRACKING_TABLE_SIZE/100) * server.tracking_table_max_fill;
- if (TrackingTableUsedSlots <= max_slots) {
+ size_t max_keys = server.tracking_table_max_fill;
+ if (raxSize(TrackingTable) <= max_keys) {
timeout_counter = 0;
return; /* Limit not reached. */
}
- /* We have to invalidate a few slots to reach the limit again. The effort
+ /* We have to invalidate a few keys to reach the limit again. The effort
* we do here is proportional to the number of times we entered this
* function and found that we are still over the limit. */
int effort = 100 * (timeout_counter+1);
- /* Let's start at a random position, and perform linear probing, in order
- * to improve cache locality. However once we are able to find an used
- * slot, jump again randomly, in order to avoid creating big holes in the
- * table (that will make this funciton use more resourced later). */
+ /* We just remove one key after another by using a random walk. */
+ raxIterator ri;
+ raxStart(&ri,TrackingTable);
while(effort > 0) {
- unsigned int idx = rand() % TRACKING_TABLE_SIZE;
- do {
- effort--;
- idx = (idx+1) % TRACKING_TABLE_SIZE;
- if (TrackingTable[idx] != NULL) {
- trackingInvalidateSlot(idx);
- if (TrackingTableUsedSlots <= max_slots) {
- timeout_counter = 0;
- return; /* Return ASAP: we are again under the limit. */
- } else {
- break; /* Jump to next random position. */
- }
- }
- } while(effort > 0);
+ effort--;
+ raxSeek(&ri,"^",NULL,0);
+ raxRandomWalk(&ri,0);
+ rax *ids = ri.data;
+ TrackingTableTotalItems -= raxSize(ids);
+ raxFree(ids);
+ raxRemove(TrackingTable,ri.key,ri.key_len,NULL);
+ if (raxSize(TrackingTable) <= max_keys) {
+ timeout_counter = 0;
+ raxStop(&ri);
+ return; /* Return ASAP: we are again under the limit. */
+ }
}
+
+ /* If we reach this point, we were not able to go under the configured
+ * limit using the maximum effort we had for this run. */
+ raxStop(&ri);
timeout_counter++;
}
/* This is just used in order to access the amount of used slots in the
* tracking table. */
-unsigned long long trackingGetUsedSlots(void) {
- return TrackingTableUsedSlots;
+uint64_t trackingGetTotalItems(void) {
+ return TrackingTableTotalItems;
}