diff options
Diffstat (limited to 'src/tracking.c')
-rw-r--r-- | src/tracking.c | 118 |
1 files changed, 87 insertions, 31 deletions
diff --git a/src/tracking.c b/src/tracking.c index 6f7929430..434e086b5 100644 --- a/src/tracking.c +++ b/src/tracking.c @@ -245,7 +245,7 @@ void sendTrackingMessage(client *c, char *keyname, size_t keylen, int proto) { * matches one or more prefixes in the prefix table. Later when we * return to the event loop, we'll send invalidation messages to the * clients subscribed to each prefix. */ -void trackingRememberKeyToBroadcast(char *keyname, size_t keylen) { +void trackingRememberKeyToBroadcast(client *c, char *keyname, size_t keylen) { raxIterator ri; raxStart(&ri,PrefixTable); raxSeek(&ri,"^",NULL,0); @@ -254,7 +254,11 @@ void trackingRememberKeyToBroadcast(char *keyname, size_t keylen) { if (ri.key_len != 0 && memcmp(ri.key,keyname,ri.key_len) != 0) continue; bcastState *bs = ri.data; - raxTryInsert(bs->keys,(unsigned char*)keyname,keylen,NULL,NULL); + /* We insert the client pointer as associated value in the radix + * tree. This way we know who was the client that did the last + * change to the key, and can avoid sending the notification in the + * case the client is in NOLOOP mode. */ + raxTryInsert(bs->keys,(unsigned char*)keyname,keylen,c,NULL); } raxStop(&ri); } @@ -262,13 +266,17 @@ void trackingRememberKeyToBroadcast(char *keyname, size_t keylen) { /* This function is called from signalModifiedKey() or other places in Redis * when a key changes value. In the context of keys tracking, our task here is * to send a notification to every client that may have keys about such caching - * slot. */ -void trackingInvalidateKey(robj *keyobj) { + * slot. + * + * Note that 'c' may be NULL in case the operation was performed outside the + * context of a client modifying the database (for instance when we delete a + * key because of expire). */ +void trackingInvalidateKey(client *c, robj *keyobj) { if (TrackingTable == NULL) return; sds sdskey = keyobj->ptr; if (raxSize(PrefixTable) > 0) - trackingRememberKeyToBroadcast(sdskey,sdslen(sdskey)); + trackingRememberKeyToBroadcast(c,sdskey,sdslen(sdskey)); rax *ids = raxFind(TrackingTable,(unsigned char*)sdskey,sdslen(sdskey)); if (ids == raxNotFound) return; @@ -279,19 +287,28 @@ void trackingInvalidateKey(robj *keyobj) { while(raxNext(&ri)) { uint64_t id; memcpy(&id,ri.key,sizeof(id)); - client *c = lookupClientByID(id); + client *target = lookupClientByID(id); /* Note that if the client is in BCAST mode, we don't want to * send invalidation messages that were pending in the case * previously the client was not in BCAST mode. This can happen if * TRACKING is enabled normally, and then the client switches to * BCAST mode. */ - if (c == NULL || - !(c->flags & CLIENT_TRACKING)|| - c->flags & CLIENT_TRACKING_BCAST) + if (target == NULL || + !(target->flags & CLIENT_TRACKING)|| + target->flags & CLIENT_TRACKING_BCAST) { continue; } - sendTrackingMessage(c,sdskey,sdslen(sdskey),0); + + /* If the client enabled the NOLOOP mode, don't send notifications + * about keys changed by the client itself. */ + if (target->flags & CLIENT_TRACKING_NOLOOP && + target == c) + { + continue; + } + + sendTrackingMessage(target,sdskey,sdslen(sdskey),0); } raxStop(&ri); @@ -383,6 +400,54 @@ void trackingLimitUsedSlots(void) { timeout_counter++; } +/* Generate Redis protocol for an array containing all the key names + * in the 'keys' radix tree. If the client is not NULL, the list will not + * include keys that were modified the last time by this client, in order + * to implement the NOLOOP option. + * + * If the resultin array would be empty, NULL is returned instead. */ +sds trackingBuildBroadcastReply(client *c, rax *keys) { + raxIterator ri; + uint64_t count; + + if (c == NULL) { + count = raxSize(keys); + } else { + count = 0; + raxStart(&ri,keys); + raxSeek(&ri,"^",NULL,0); + while(raxNext(&ri)) { + if (ri.data != c) count++; + } + raxStop(&ri); + + if (count == 0) return NULL; + } + + /* Create the array reply with the list of keys once, then send + * it to all the clients subscribed to this prefix. */ + char buf[32]; + size_t len = ll2string(buf,sizeof(buf),count); + sds proto = sdsempty(); + proto = sdsMakeRoomFor(proto,count*15); + proto = sdscatlen(proto,"*",1); + proto = sdscatlen(proto,buf,len); + proto = sdscatlen(proto,"\r\n",2); + raxStart(&ri,keys); + raxSeek(&ri,"^",NULL,0); + while(raxNext(&ri)) { + if (c && ri.data == c) continue; + len = ll2string(buf,sizeof(buf),ri.key_len); + proto = sdscatlen(proto,"$",1); + proto = sdscatlen(proto,buf,len); + proto = sdscatlen(proto,"\r\n",2); + proto = sdscatlen(proto,ri.key,ri.key_len); + proto = sdscatlen(proto,"\r\n",2); + } + raxStop(&ri); + return proto; +} + /* This function will run the prefixes of clients in BCAST mode and * keys that were modified about each prefix, and will send the * notifications to each client in each prefix. */ @@ -397,26 +462,10 @@ void trackingBroadcastInvalidationMessages(void) { while(raxNext(&ri)) { bcastState *bs = ri.data; if (raxSize(bs->keys)) { - /* Create the array reply with the list of keys once, then send - * it to all the clients subscribed to this prefix. */ - char buf[32]; - size_t len = ll2string(buf,sizeof(buf),raxSize(bs->keys)); - sds proto = sdsempty(); - proto = sdsMakeRoomFor(proto,raxSize(bs->keys)*15); - proto = sdscatlen(proto,"*",1); - proto = sdscatlen(proto,buf,len); - proto = sdscatlen(proto,"\r\n",2); - raxStart(&ri2,bs->keys); - raxSeek(&ri2,"^",NULL,0); - while(raxNext(&ri2)) { - len = ll2string(buf,sizeof(buf),ri2.key_len); - proto = sdscatlen(proto,"$",1); - proto = sdscatlen(proto,buf,len); - proto = sdscatlen(proto,"\r\n",2); - proto = sdscatlen(proto,ri2.key,ri2.key_len); - proto = sdscatlen(proto,"\r\n",2); - } - raxStop(&ri2); + + /* Generate the common protocol for all the clients that are + * not using the NOLOOP option. */ + sds proto = trackingBuildBroadcastReply(NULL,bs->keys); /* Send this array of keys to every client in the list. */ raxStart(&ri2,bs->clients); @@ -424,7 +473,14 @@ void trackingBroadcastInvalidationMessages(void) { while(raxNext(&ri2)) { client *c; memcpy(&c,ri2.key,sizeof(c)); - sendTrackingMessage(c,proto,sdslen(proto),1); + if (c->flags & CLIENT_TRACKING_NOLOOP) { + /* This client may have certain keys excluded. */ + sds adhoc = trackingBuildBroadcastReply(c,bs->keys); + sendTrackingMessage(c,adhoc,sdslen(adhoc),1); + sdsfree(adhoc); + } else { + sendTrackingMessage(c,proto,sdslen(proto),1); + } } raxStop(&ri2); |