summaryrefslogtreecommitdiff
path: root/src/tracking.c
diff options
context:
space:
mode:
authorantirez <antirez@gmail.com>2020-02-14 14:29:18 +0100
committerantirez <antirez@gmail.com>2020-02-14 14:29:18 +0100
commit090bc0c1a37cfba092102527524ee2a3023d0481 (patch)
treebd6da2a89dd6726b2e23ea7da2dbbcc305c1a568 /src/tracking.c
parentc21c23bbba0989e84cb145163ea9ecd76f2db8fe (diff)
parent8ea7a3ee686a8bddf0b07585922917adcfda91dc (diff)
downloadredis-090bc0c1a37cfba092102527524ee2a3023d0481.tar.gz
Merge branch 'csc2' into unstable
Diffstat (limited to 'src/tracking.c')
-rw-r--r--src/tracking.c383
1 files changed, 266 insertions, 117 deletions
diff --git a/src/tracking.c b/src/tracking.c
index acb97800a..3333472a4 100644
--- a/src/tracking.c
+++ b/src/tracking.c
@@ -30,39 +30,34 @@
#include "server.h"
-/* The tracking table is constituted by 2^24 radix trees (each tree, and the
- * table itself, are allocated in a lazy way only when needed) tracking
- * clients that may have certain keys in their local, client side, cache.
- *
- * Keys are grouped into 2^24 slots, in a way similar to Redis Cluster hash
- * slots, however here the function we use is crc64, taking the least
- * significant 24 bits of the output.
+/* The tracking table is constituted by a radix tree of keys, each pointing
+ * to a radix tree of client IDs, used to track the clients that may have
+ * certain keys in their local, client side, cache.
*
* When a client enables tracking with "CLIENT TRACKING on", each key served to
- * the client is hashed to one of such slots, and Redis will remember what
- * client may have keys about such slot. Later, when a key in a given slot is
- * modified, all the clients that may have local copies of keys in that slot
- * will receive an invalidation message. There is no distinction of database
- * number: a single table is used.
+ * the client is remembered in the table mapping the keys to the client IDs.
+ * Later, when a key is modified, all the clients that may have local copy
+ * of such key will receive an invalidation message.
*
* Clients will normally take frequently requested objects in memory, removing
- * them when invalidation messages are received. A strategy clients may use is
- * to just cache objects in a dictionary, associating to each cached object
- * some incremental epoch, or just a timestamp. When invalidation messages are
- * received clients may store, in a different table, the timestamp (or epoch)
- * of the invalidation of such given slot: later when accessing objects, the
- * eviction of stale objects may be performed in a lazy way by checking if the
- * cached object timestamp is older than the invalidation timestamp for such
- * objects.
- *
- * The output of the 24 bit hash function is very large (more than 16 million
- * possible slots), so clients that may want to use less resources may only
- * use the most significant bits instead of the full 24 bits. */
-#define TRACKING_TABLE_SIZE (1<<24)
-rax **TrackingTable = NULL;
-unsigned long TrackingTableUsedSlots = 0;
+ * them when invalidation messages are received. */
+rax *TrackingTable = NULL;
+rax *PrefixTable = NULL;
+uint64_t TrackingTableTotalItems = 0; /* Total number of IDs stored across
+ the whole tracking table. This givesn
+ an hint about the total memory we
+ are using server side for CSC. */
robj *TrackingChannelName;
+/* This is the structure that we have as value of the PrefixTable, and
+ * represents the list of keys modified, and the list of clients that need
+ * to be notified, for a given prefix. */
+typedef struct bcastState {
+ rax *keys; /* Keys modified in the current event loop cycle. */
+ rax *clients; /* Clients subscribed to the notification events for this
+ prefix. */
+} bcastState;
+
/* Remove the tracking state from the client 'c'. Note that there is not much
* to do for us here, if not to decrement the counter of the clients in
* tracking mode, because we just store the ID of the client in the tracking
@@ -70,9 +65,55 @@ robj *TrackingChannelName;
* client with many entries in the table is removed, it would cost a lot of
* time to do the cleanup. */
void disableTracking(client *c) {
+ /* If this client is in broadcasting mode, we need to unsubscribe it
+ * from all the prefixes it is registered to. */
+ if (c->flags & CLIENT_TRACKING_BCAST) {
+ raxIterator ri;
+ raxStart(&ri,c->client_tracking_prefixes);
+ raxSeek(&ri,"^",NULL,0);
+ while(raxNext(&ri)) {
+ bcastState *bs = raxFind(PrefixTable,ri.key,ri.key_len);
+ serverAssert(bs != raxNotFound);
+ raxRemove(bs->clients,(unsigned char*)&c,sizeof(c),NULL);
+ /* Was it the last client? Remove the prefix from the
+ * table. */
+ if (raxSize(bs->clients) == 0) {
+ raxFree(bs->clients);
+ raxFree(bs->keys);
+ zfree(bs);
+ raxRemove(PrefixTable,ri.key,ri.key_len,NULL);
+ }
+ }
+ raxStop(&ri);
+ raxFree(c->client_tracking_prefixes);
+ c->client_tracking_prefixes = NULL;
+ }
+
+ /* Clear flags and adjust the count. */
if (c->flags & CLIENT_TRACKING) {
server.tracking_clients--;
- c->flags &= ~(CLIENT_TRACKING|CLIENT_TRACKING_BROKEN_REDIR);
+ c->flags &= ~(CLIENT_TRACKING|CLIENT_TRACKING_BROKEN_REDIR|
+ CLIENT_TRACKING_BCAST);
+ }
+}
+
+/* Set the client 'c' to track the prefix 'prefix'. If the client 'c' is
+ * already registered for the specified prefix, no operation is performed. */
+void enableBcastTrackingForPrefix(client *c, char *prefix, size_t plen) {
+ bcastState *bs = raxFind(PrefixTable,(unsigned char*)prefix,sdslen(prefix));
+ /* If this is the first client subscribing to such prefix, create
+ * the prefix in the table. */
+ if (bs == raxNotFound) {
+ bs = zmalloc(sizeof(*bs));
+ bs->keys = raxNew();
+ bs->clients = raxNew();
+ raxInsert(PrefixTable,(unsigned char*)prefix,plen,bs,NULL);
+ }
+ if (raxTryInsert(bs->clients,(unsigned char*)&c,sizeof(c),NULL,NULL)) {
+ if (c->client_tracking_prefixes == NULL)
+ c->client_tracking_prefixes = raxNew();
+ raxInsert(c->client_tracking_prefixes,
+ (unsigned char*)prefix,plen,NULL,NULL);
}
}
@@ -83,16 +124,25 @@ void disableTracking(client *c) {
* eventually get freed, we'll send a message to the original client to
* inform it of the condition. Multiple clients can redirect the invalidation
* messages to the same client ID. */
-void enableTracking(client *c, uint64_t redirect_to) {
- if (c->flags & CLIENT_TRACKING) return;
+void enableTracking(client *c, uint64_t redirect_to, int bcast, robj **prefix, size_t numprefix) {
+ if (!(c->flags & CLIENT_TRACKING)) server.tracking_clients++;
c->flags |= CLIENT_TRACKING;
- c->flags &= ~CLIENT_TRACKING_BROKEN_REDIR;
+ c->flags &= ~(CLIENT_TRACKING_BROKEN_REDIR|CLIENT_TRACKING_BCAST);
c->client_tracking_redirection = redirect_to;
- server.tracking_clients++;
if (TrackingTable == NULL) {
- TrackingTable = zcalloc(sizeof(rax*) * TRACKING_TABLE_SIZE);
+ TrackingTable = raxNew();
+ PrefixTable = raxNew();
TrackingChannelName = createStringObject("__redis__:invalidate",20);
}
+
+ if (bcast) {
+ c->flags |= CLIENT_TRACKING_BCAST;
+ if (numprefix == 0) enableBcastTrackingForPrefix(c,"",0);
+ for (size_t j = 0; j < numprefix; j++) {
+ sds sdsprefix = prefix[j]->ptr;
+ enableBcastTrackingForPrefix(c,sdsprefix,sdslen(sdsprefix));
+ }
+ }
}
/* This function is called after the excution of a readonly command in the
@@ -108,19 +158,30 @@ void trackingRememberKeys(client *c) {
for(int j = 0; j < numkeys; j++) {
int idx = keys[j];
sds sdskey = c->argv[idx]->ptr;
- uint64_t hash = crc64(0,
- (unsigned char*)sdskey,sdslen(sdskey))&(TRACKING_TABLE_SIZE-1);
- if (TrackingTable[hash] == NULL) {
- TrackingTable[hash] = raxNew();
- TrackingTableUsedSlots++;
+ rax *ids = raxFind(TrackingTable,(unsigned char*)sdskey,sdslen(sdskey));
+ if (ids == raxNotFound) {
+ ids = raxNew();
+ int inserted = raxTryInsert(TrackingTable,(unsigned char*)sdskey,
+ sdslen(sdskey),ids, NULL);
+ serverAssert(inserted == 1);
}
- raxTryInsert(TrackingTable[hash],
- (unsigned char*)&c->id,sizeof(c->id),NULL,NULL);
+ if (raxTryInsert(ids,(unsigned char*)&c->id,sizeof(c->id),NULL,NULL))
+ TrackingTableTotalItems++;
}
getKeysFreeResult(keys);
}
-void sendTrackingMessage(client *c, long long hash) {
+/* Given a key name, this function sends an invalidation message in the
+ * proper channel (depending on RESP version: PubSub or Push message) and
+ * to the proper client (in case fo redirection), in the context of the
+ * client 'c' with tracking enabled.
+ *
+ * In case the 'proto' argument is non zero, the function will assume that
+ * 'keyname' points to a buffer of 'keylen' bytes already expressed in the
+ * form of Redis RESP protocol, representing an array of keys to send
+ * to the client as value of the invalidation. This is used in BCAST mode
+ * in order to optimized the implementation to use less CPU time. */
+void sendTrackingMessage(client *c, char *keyname, size_t keylen, int proto) {
int using_redirection = 0;
if (c->client_tracking_redirection) {
client *redir = lookupClientByID(c->client_tracking_redirection);
@@ -146,36 +207,45 @@ void sendTrackingMessage(client *c, long long hash) {
if (c->resp > 2) {
addReplyPushLen(c,2);
addReplyBulkCBuffer(c,"invalidate",10);
- addReplyLongLong(c,hash);
} else if (using_redirection && c->flags & CLIENT_PUBSUB) {
- robj *msg = createStringObjectFromLongLong(hash);
- addReplyPubsubMessage(c,TrackingChannelName,msg);
- decrRefCount(msg);
+ /* We use a static object to speedup things, however we assume
+ * that addReplyPubsubMessage() will not take a reference. */
+ addReplyPubsubMessage(c,TrackingChannelName,NULL);
+ } else {
+ /* If are here, the client is not using RESP3, nor is
+ * redirecting to another client. We can't send anything to
+ * it since RESP2 does not support push messages in the same
+ * connection. */
+ return;
}
-}
-/* Invalidates a caching slot: this is actually the low level implementation
- * of the API that Redis calls externally, that is trackingInvalidateKey(). */
-void trackingInvalidateSlot(uint64_t slot) {
- if (TrackingTable == NULL || TrackingTable[slot] == NULL) return;
+ /* Send the "value" part, which is the array of keys. */
+ if (proto) {
+ addReplyProto(c,keyname,keylen);
+ } else {
+ addReplyArrayLen(c,1);
+ addReplyBulkCBuffer(c,keyname,keylen);
+ }
+}
+/* This function is called when a key is modified in Redis and in the case
+ * we have at least one client with the BCAST mode enabled.
+ * Its goal is to set the key in the right broadcast state if the key
+ * matches one or more prefixes in the prefix table. Later when we
+ * return to the event loop, we'll send invalidation messages to the
+ * clients subscribed to each prefix. */
+void trackingRememberKeyToBroadcast(char *keyname, size_t keylen) {
raxIterator ri;
- raxStart(&ri,TrackingTable[slot]);
+ raxStart(&ri,PrefixTable);
raxSeek(&ri,"^",NULL,0);
while(raxNext(&ri)) {
- uint64_t id;
- memcpy(&id,ri.key,sizeof(id));
- client *c = lookupClientByID(id);
- if (c == NULL || !(c->flags & CLIENT_TRACKING)) continue;
- sendTrackingMessage(c,slot);
+ if (ri.key_len > keylen) continue;
+ if (ri.key_len != 0 && memcmp(ri.key,keyname,ri.key_len) != 0)
+ continue;
+ bcastState *bs = ri.data;
+ raxTryInsert(bs->keys,(unsigned char*)keyname,keylen,NULL,NULL);
}
raxStop(&ri);
-
- /* Free the tracking table: we'll create the radix tree and populate it
- * again if more keys will be modified in this caching slot. */
- raxFree(TrackingTable[slot]);
- TrackingTable[slot] = NULL;
- TrackingTableUsedSlots--;
}
/* This function is called from signalModifiedKey() or other places in Redis
@@ -183,12 +253,42 @@ void trackingInvalidateSlot(uint64_t slot) {
* to send a notification to every client that may have keys about such caching
* slot. */
void trackingInvalidateKey(robj *keyobj) {
- if (TrackingTable == NULL || TrackingTableUsedSlots == 0) return;
-
+ if (TrackingTable == NULL) return;
sds sdskey = keyobj->ptr;
- uint64_t hash = crc64(0,
- (unsigned char*)sdskey,sdslen(sdskey))&(TRACKING_TABLE_SIZE-1);
- trackingInvalidateSlot(hash);
+
+ if (raxSize(PrefixTable) > 0)
+ trackingRememberKeyToBroadcast(sdskey,sdslen(sdskey));
+
+ rax *ids = raxFind(TrackingTable,(unsigned char*)sdskey,sdslen(sdskey));
+ if (ids == raxNotFound) return;;
+
+ raxIterator ri;
+ raxStart(&ri,ids);
+ raxSeek(&ri,"^",NULL,0);
+ while(raxNext(&ri)) {
+ uint64_t id;
+ memcpy(&id,ri.key,sizeof(id));
+ client *c = lookupClientByID(id);
+ /* Note that if the client is in BCAST mode, we don't want to
+ * send invalidation messages that were pending in the case
+ * previously the client was not in BCAST mode. This can happen if
+ * TRACKING is enabled normally, and then the client switches to
+ * BCAST mode. */
+ if (c == NULL ||
+ !(c->flags & CLIENT_TRACKING)||
+ c->flags & CLIENT_TRACKING_BCAST)
+ {
+ continue;
+ }
+ sendTrackingMessage(c,sdskey,sdslen(sdskey),0);
+ }
+ raxStop(&ri);
+
+ /* Free the tracking table: we'll create the radix tree and populate it
+ * again if more keys will be modified in this caching slot. */
+ TrackingTableTotalItems -= raxSize(ids);
+ raxFree(ids);
+ raxRemove(TrackingTable,(unsigned char*)sdskey,sdslen(sdskey),NULL);
}
/* This function is called when one or all the Redis databases are flushed
@@ -205,6 +305,10 @@ void trackingInvalidateKey(robj *keyobj) {
* we just send the invalidation message to all the clients, but don't
* flush the table: it will slowly get garbage collected as more keys
* are modified in the used caching slots. */
+void freeTrackingRadixTree(void *rt) {
+ raxFree(rt);
+}
+
void trackingInvalidateKeysOnFlush(int dbid) {
if (server.tracking_clients) {
listNode *ln;
@@ -213,84 +317,129 @@ void trackingInvalidateKeysOnFlush(int dbid) {
while ((ln = listNext(&li)) != NULL) {
client *c = listNodeValue(ln);
if (c->flags & CLIENT_TRACKING) {
- sendTrackingMessage(c,-1);
+ sendTrackingMessage(c,"",1,0);
}
}
}
/* In case of FLUSHALL, reclaim all the memory used by tracking. */
if (dbid == -1 && TrackingTable) {
- for (int j = 0; j < TRACKING_TABLE_SIZE && TrackingTableUsedSlots > 0; j++) {
- if (TrackingTable[j] != NULL) {
- raxFree(TrackingTable[j]);
- TrackingTable[j] = NULL;
- TrackingTableUsedSlots--;
- }
- }
-
- /* If there are no clients with tracking enabled, we can even
- * reclaim the memory used by the table itself. The code assumes
- * the table is allocated only if there is at least one client alive
- * with tracking enabled. */
- if (server.tracking_clients == 0) {
- zfree(TrackingTable);
- TrackingTable = NULL;
- }
+ raxFreeWithCallback(TrackingTable,freeTrackingRadixTree);
+ TrackingTableTotalItems = 0;
}
}
/* Tracking forces Redis to remember information about which client may have
- * keys about certian caching slots. In workloads where there are a lot of
- * reads, but keys are hardly modified, the amount of information we have
- * to remember server side could be a lot: for each 16 millions of caching
- * slots we may end with a radix tree containing many entries.
+ * certain keys. In workloads where there are a lot of reads, but keys are
+ * hardly modified, the amount of information we have to remember server side
+ * could be a lot, with the number of keys being totally not bound.
*
- * So Redis allows the user to configure a maximum fill rate for the
+ * So Redis allows the user to configure a maximum number of keys for the
* invalidation table. This function makes sure that we don't go over the
* specified fill rate: if we are over, we can just evict informations about
- * random caching slots, and send invalidation messages to clients like if
- * the key was modified. */
+ * a random key, and send invalidation messages to clients like if the key was
+ * modified. */
void trackingLimitUsedSlots(void) {
static unsigned int timeout_counter = 0;
-
- if (server.tracking_table_max_fill == 0) return; /* No limits set. */
- unsigned int max_slots =
- (TRACKING_TABLE_SIZE/100) * server.tracking_table_max_fill;
- if (TrackingTableUsedSlots <= max_slots) {
+ if (TrackingTable == NULL) return;
+ if (server.tracking_table_max_keys == 0) return; /* No limits set. */
+ size_t max_keys = server.tracking_table_max_keys;
+ if (raxSize(TrackingTable) <= max_keys) {
timeout_counter = 0;
return; /* Limit not reached. */
}
- /* We have to invalidate a few slots to reach the limit again. The effort
+ /* We have to invalidate a few keys to reach the limit again. The effort
* we do here is proportional to the number of times we entered this
* function and found that we are still over the limit. */
int effort = 100 * (timeout_counter+1);
- /* Let's start at a random position, and perform linear probing, in order
- * to improve cache locality. However once we are able to find an used
- * slot, jump again randomly, in order to avoid creating big holes in the
- * table (that will make this funciton use more resourced later). */
+ /* We just remove one key after another by using a random walk. */
+ raxIterator ri;
+ raxStart(&ri,TrackingTable);
while(effort > 0) {
- unsigned int idx = rand() % TRACKING_TABLE_SIZE;
- do {
- effort--;
- idx = (idx+1) % TRACKING_TABLE_SIZE;
- if (TrackingTable[idx] != NULL) {
- trackingInvalidateSlot(idx);
- if (TrackingTableUsedSlots <= max_slots) {
- timeout_counter = 0;
- return; /* Return ASAP: we are again under the limit. */
- } else {
- break; /* Jump to next random position. */
- }
- }
- } while(effort > 0);
+ effort--;
+ raxSeek(&ri,"^",NULL,0);
+ raxRandomWalk(&ri,0);
+ rax *ids = ri.data;
+ TrackingTableTotalItems -= raxSize(ids);
+ raxFree(ids);
+ raxRemove(TrackingTable,ri.key,ri.key_len,NULL);
+ if (raxSize(TrackingTable) <= max_keys) {
+ timeout_counter = 0;
+ raxStop(&ri);
+ return; /* Return ASAP: we are again under the limit. */
+ }
}
+
+ /* If we reach this point, we were not able to go under the configured
+ * limit using the maximum effort we had for this run. */
+ raxStop(&ri);
timeout_counter++;
}
+/* This function will run the prefixes of clients in BCAST mode and
+ * keys that were modified about each prefix, and will send the
+ * notifications to each client in each prefix. */
+void trackingBroadcastInvalidationMessages(void) {
+ raxIterator ri, ri2;
+
+ /* Return ASAP if there is nothing to do here. */
+ if (TrackingTable == NULL || !server.tracking_clients) return;
+
+ raxStart(&ri,PrefixTable);
+ raxSeek(&ri,"^",NULL,0);
+ while(raxNext(&ri)) {
+ bcastState *bs = ri.data;
+ if (raxSize(bs->keys)) {
+ /* Create the array reply with the list of keys once, then send
+ * it to all the clients subscribed to this prefix. */
+ char buf[32];
+ size_t len = ll2string(buf,sizeof(buf),raxSize(bs->keys));
+ sds proto = sdsempty();
+ proto = sdsMakeRoomFor(proto,raxSize(bs->keys)*15);
+ proto = sdscatlen(proto,"*",1);
+ proto = sdscatlen(proto,buf,len);
+ proto = sdscatlen(proto,"\r\n",2);
+ raxStart(&ri2,bs->keys);
+ raxSeek(&ri2,"^",NULL,0);
+ while(raxNext(&ri2)) {
+ len = ll2string(buf,sizeof(buf),ri2.key_len);
+ proto = sdscatlen(proto,"$",1);
+ proto = sdscatlen(proto,buf,len);
+ proto = sdscatlen(proto,"\r\n",2);
+ proto = sdscatlen(proto,ri2.key,ri2.key_len);
+ proto = sdscatlen(proto,"\r\n",2);
+ }
+ raxStop(&ri2);
+
+ /* Send this array of keys to every client in the list. */
+ raxStart(&ri2,bs->clients);
+ raxSeek(&ri2,"^",NULL,0);
+ while(raxNext(&ri2)) {
+ client *c;
+ memcpy(&c,ri2.key,sizeof(c));
+ sendTrackingMessage(c,proto,sdslen(proto),1);
+ }
+ raxStop(&ri2);
+
+ /* Clean up: we can remove everything from this state, because we
+ * want to only track the new keys that will be accumulated starting
+ * from now. */
+ sdsfree(proto);
+ }
+ raxFree(bs->keys);
+ bs->keys = raxNew();
+ }
+ raxStop(&ri);
+}
+
/* This is just used in order to access the amount of used slots in the
* tracking table. */
-unsigned long long trackingGetUsedSlots(void) {
- return TrackingTableUsedSlots;
+uint64_t trackingGetTotalItems(void) {
+ return TrackingTableTotalItems;
+}
+
+uint64_t trackingGetTotalKeys(void) {
+ return raxSize(TrackingTable);
}