diff options
author | antirez <antirez@gmail.com> | 2020-02-11 17:26:27 +0100 |
---|---|---|
committer | antirez <antirez@gmail.com> | 2020-02-11 17:26:29 +0100 |
commit | 3f7ba86255b9d6acd73dd39cc8f05d3d3f8741a9 (patch) | |
tree | ff50e42db861bf169562a5106f5525f247c19453 /src/tracking.c | |
parent | dfe126f3e92c9770bef1915b6e64add2c41edcfa (diff) | |
download | redis-3f7ba86255b9d6acd73dd39cc8f05d3d3f8741a9.tar.gz |
Tracking: BCAST: registration in the prefix table.
Diffstat (limited to 'src/tracking.c')
-rw-r--r-- | src/tracking.c | 57 |
1 files changed, 54 insertions, 3 deletions
diff --git a/src/tracking.c b/src/tracking.c index 413b21328..9f46275a4 100644 --- a/src/tracking.c +++ b/src/tracking.c @@ -49,6 +49,15 @@ uint64_t TrackingTableTotalItems = 0; /* Total number of IDs stored across are using server side for CSC. */ robj *TrackingChannelName; +/* This is the structure that we have as value of the PrefixTable, and + * represents the list of keys modified, and the list of clients that need + * to be notified, for a given prefix. */ +typedef struct bcastState { + rax *keys; /* Keys modified in the current event loop cycle. */ + rax *clients; /* Clients subscribed to the notification events for this + prefix. */ +} bcastState; + /* Remove the tracking state from the client 'c'. Note that there is not much * to do for us here, if not to decrement the counter of the clients in * tracking mode, because we just store the ID of the client in the tracking @@ -56,9 +65,51 @@ robj *TrackingChannelName; * client with many entries in the table is removed, it would cost a lot of * time to do the cleanup. */ void disableTracking(client *c) { + /* If this client is in broadcasting mode, we need to unsubscribe it + * from all the prefixes it is registered to. */ + if (c->flags & CLIENT_TRACKING_BCAST) { + raxIterator ri; + raxStart(&ri,c->client_tracking_prefixes); + raxSeek(&ri,"^",NULL,0); + while(raxNext(&ri)) { + bcastState *bs = raxFind(PrefixTable,ri.key,ri.key_len); + serverAssert(bs != raxNotFound); + raxRemove(bs->clients,(unsigned char*)&c,sizeof(c),NULL); + /* Was it the last client? Remove the prefix from the + * table. */ + if (raxSize(bs->clients) == 0) { + raxFree(bs->clients); + raxFree(bs->keys); + zfree(bs); + raxRemove(PrefixTable,ri.key,ri.key_len,NULL); + } + } + raxStop(&ri); + } + + /* Clear flags and adjust the count. */ if (c->flags & CLIENT_TRACKING) { server.tracking_clients--; - c->flags &= ~(CLIENT_TRACKING|CLIENT_TRACKING_BROKEN_REDIR); + c->flags &= ~(CLIENT_TRACKING|CLIENT_TRACKING_BROKEN_REDIR| + CLIENT_TRACKING_BCAST); + } +} + +/* Set the client 'c' to track the prefix 'prefix'. If the client 'c' is + * already registered for the specified prefix, no operation is performed. */ +void enableBcastTrackingForPrefix(client *c, char *prefix, size_t plen) { + bcastState *bs = raxFind(PrefixTable,(unsigned char*)prefix,sdslen(prefix)); + /* If this is the first client subscribing to such prefix, create + * the prefix in the table. */ + if (bs == raxNotFound) { + bs = zmalloc(sizeof(*bs)); + bs->keys = raxNew(); + bs->clients = raxNew(); + raxInsert(PrefixTable,(unsigned char*)prefix,plen,bs,NULL); + } + if (raxTryInsert(bs->clients,(unsigned char*)&c,sizeof(c),NULL,NULL)) { + raxInsert(c->client_tracking_prefixes, + (unsigned char*)prefix,plen,NULL,NULL); } } @@ -83,9 +134,9 @@ void enableTracking(client *c, uint64_t redirect_to, int bcast, robj **prefix, s if (bcast) { c->flags |= CLIENT_TRACKING_BCAST; if (numprefix == 0) enableBcastTrackingForPrefix(c,"",0); - for (int j = 0; j < numprefix; j++) { + for (size_t j = 0; j < numprefix; j++) { sds sdsprefix = prefix[j]->ptr; - enableBcastTrackingForPrefix(c,sdsprefix,sdslen(prefix)); + enableBcastTrackingForPrefix(c,sdsprefix,sdslen(sdsprefix)); } } } |