summaryrefslogtreecommitdiff
path: root/src/tracking.c
diff options
context:
space:
mode:
authorantirez <antirez@gmail.com>2020-04-21 10:51:46 +0200
committerantirez <antirez@gmail.com>2020-04-21 10:51:46 +0200
commit94f2e7f9f9f7e6eca8f8bd7ae412c34806e68351 (patch)
tree2927805c22e495387d171df16cd530147245a02b /src/tracking.c
parentc7db333abb0e45ff8974ef2d1fc4f9ae1e7be1e2 (diff)
downloadredis-94f2e7f9f9f7e6eca8f8bd7ae412c34806e68351.tar.gz
Tracking: NOLOOP internals implementation.
Diffstat (limited to 'src/tracking.c')
-rw-r--r--src/tracking.c118
1 files changed, 87 insertions, 31 deletions
diff --git a/src/tracking.c b/src/tracking.c
index 6f7929430..434e086b5 100644
--- a/src/tracking.c
+++ b/src/tracking.c
@@ -245,7 +245,7 @@ void sendTrackingMessage(client *c, char *keyname, size_t keylen, int proto) {
* matches one or more prefixes in the prefix table. Later when we
* return to the event loop, we'll send invalidation messages to the
* clients subscribed to each prefix. */
-void trackingRememberKeyToBroadcast(char *keyname, size_t keylen) {
+void trackingRememberKeyToBroadcast(client *c, char *keyname, size_t keylen) {
raxIterator ri;
raxStart(&ri,PrefixTable);
raxSeek(&ri,"^",NULL,0);
@@ -254,7 +254,11 @@ void trackingRememberKeyToBroadcast(char *keyname, size_t keylen) {
if (ri.key_len != 0 && memcmp(ri.key,keyname,ri.key_len) != 0)
continue;
bcastState *bs = ri.data;
- raxTryInsert(bs->keys,(unsigned char*)keyname,keylen,NULL,NULL);
+ /* We insert the client pointer as associated value in the radix
+ * tree. This way we know who was the client that did the last
+ * change to the key, and can avoid sending the notification in the
+ * case the client is in NOLOOP mode. */
+ raxTryInsert(bs->keys,(unsigned char*)keyname,keylen,c,NULL);
}
raxStop(&ri);
}
@@ -262,13 +266,17 @@ void trackingRememberKeyToBroadcast(char *keyname, size_t keylen) {
/* This function is called from signalModifiedKey() or other places in Redis
* when a key changes value. In the context of keys tracking, our task here is
* to send a notification to every client that may have keys about such caching
- * slot. */
-void trackingInvalidateKey(robj *keyobj) {
+ * slot.
+ *
+ * Note that 'c' may be NULL in case the operation was performed outside the
+ * context of a client modifying the database (for instance when we delete a
+ * key because of expire). */
+void trackingInvalidateKey(client *c, robj *keyobj) {
if (TrackingTable == NULL) return;
sds sdskey = keyobj->ptr;
if (raxSize(PrefixTable) > 0)
- trackingRememberKeyToBroadcast(sdskey,sdslen(sdskey));
+ trackingRememberKeyToBroadcast(c,sdskey,sdslen(sdskey));
rax *ids = raxFind(TrackingTable,(unsigned char*)sdskey,sdslen(sdskey));
if (ids == raxNotFound) return;
@@ -279,19 +287,28 @@ void trackingInvalidateKey(robj *keyobj) {
while(raxNext(&ri)) {
uint64_t id;
memcpy(&id,ri.key,sizeof(id));
- client *c = lookupClientByID(id);
+ client *target = lookupClientByID(id);
/* Note that if the client is in BCAST mode, we don't want to
* send invalidation messages that were pending in the case
* previously the client was not in BCAST mode. This can happen if
* TRACKING is enabled normally, and then the client switches to
* BCAST mode. */
- if (c == NULL ||
- !(c->flags & CLIENT_TRACKING)||
- c->flags & CLIENT_TRACKING_BCAST)
+ if (target == NULL ||
+ !(target->flags & CLIENT_TRACKING)||
+ target->flags & CLIENT_TRACKING_BCAST)
{
continue;
}
- sendTrackingMessage(c,sdskey,sdslen(sdskey),0);
+
+ /* If the client enabled the NOLOOP mode, don't send notifications
+ * about keys changed by the client itself. */
+ if (target->flags & CLIENT_TRACKING_NOLOOP &&
+ target == c)
+ {
+ continue;
+ }
+
+ sendTrackingMessage(target,sdskey,sdslen(sdskey),0);
}
raxStop(&ri);
@@ -383,6 +400,54 @@ void trackingLimitUsedSlots(void) {
timeout_counter++;
}
+/* Generate Redis protocol for an array containing all the key names
+ * in the 'keys' radix tree. If the client is not NULL, the list will not
+ * include keys that were modified the last time by this client, in order
+ * to implement the NOLOOP option.
+ *
+ * If the resultin array would be empty, NULL is returned instead. */
+sds trackingBuildBroadcastReply(client *c, rax *keys) {
+ raxIterator ri;
+ uint64_t count;
+
+ if (c == NULL) {
+ count = raxSize(keys);
+ } else {
+ count = 0;
+ raxStart(&ri,keys);
+ raxSeek(&ri,"^",NULL,0);
+ while(raxNext(&ri)) {
+ if (ri.data != c) count++;
+ }
+ raxStop(&ri);
+
+ if (count == 0) return NULL;
+ }
+
+ /* Create the array reply with the list of keys once, then send
+ * it to all the clients subscribed to this prefix. */
+ char buf[32];
+ size_t len = ll2string(buf,sizeof(buf),count);
+ sds proto = sdsempty();
+ proto = sdsMakeRoomFor(proto,count*15);
+ proto = sdscatlen(proto,"*",1);
+ proto = sdscatlen(proto,buf,len);
+ proto = sdscatlen(proto,"\r\n",2);
+ raxStart(&ri,keys);
+ raxSeek(&ri,"^",NULL,0);
+ while(raxNext(&ri)) {
+ if (c && ri.data == c) continue;
+ len = ll2string(buf,sizeof(buf),ri.key_len);
+ proto = sdscatlen(proto,"$",1);
+ proto = sdscatlen(proto,buf,len);
+ proto = sdscatlen(proto,"\r\n",2);
+ proto = sdscatlen(proto,ri.key,ri.key_len);
+ proto = sdscatlen(proto,"\r\n",2);
+ }
+ raxStop(&ri);
+ return proto;
+}
+
/* This function will run the prefixes of clients in BCAST mode and
* keys that were modified about each prefix, and will send the
* notifications to each client in each prefix. */
@@ -397,26 +462,10 @@ void trackingBroadcastInvalidationMessages(void) {
while(raxNext(&ri)) {
bcastState *bs = ri.data;
if (raxSize(bs->keys)) {
- /* Create the array reply with the list of keys once, then send
- * it to all the clients subscribed to this prefix. */
- char buf[32];
- size_t len = ll2string(buf,sizeof(buf),raxSize(bs->keys));
- sds proto = sdsempty();
- proto = sdsMakeRoomFor(proto,raxSize(bs->keys)*15);
- proto = sdscatlen(proto,"*",1);
- proto = sdscatlen(proto,buf,len);
- proto = sdscatlen(proto,"\r\n",2);
- raxStart(&ri2,bs->keys);
- raxSeek(&ri2,"^",NULL,0);
- while(raxNext(&ri2)) {
- len = ll2string(buf,sizeof(buf),ri2.key_len);
- proto = sdscatlen(proto,"$",1);
- proto = sdscatlen(proto,buf,len);
- proto = sdscatlen(proto,"\r\n",2);
- proto = sdscatlen(proto,ri2.key,ri2.key_len);
- proto = sdscatlen(proto,"\r\n",2);
- }
- raxStop(&ri2);
+
+ /* Generate the common protocol for all the clients that are
+ * not using the NOLOOP option. */
+ sds proto = trackingBuildBroadcastReply(NULL,bs->keys);
/* Send this array of keys to every client in the list. */
raxStart(&ri2,bs->clients);
@@ -424,7 +473,14 @@ void trackingBroadcastInvalidationMessages(void) {
while(raxNext(&ri2)) {
client *c;
memcpy(&c,ri2.key,sizeof(c));
- sendTrackingMessage(c,proto,sdslen(proto),1);
+ if (c->flags & CLIENT_TRACKING_NOLOOP) {
+ /* This client may have certain keys excluded. */
+ sds adhoc = trackingBuildBroadcastReply(c,bs->keys);
+ sendTrackingMessage(c,adhoc,sdslen(adhoc),1);
+ sdsfree(adhoc);
+ } else {
+ sendTrackingMessage(c,proto,sdslen(proto),1);
+ }
}
raxStop(&ri2);