summaryrefslogtreecommitdiff
path: root/src/tracking.c
diff options
context:
space:
mode:
authorantirez <antirez@gmail.com>2020-02-11 17:26:27 +0100
committerantirez <antirez@gmail.com>2020-02-11 17:26:29 +0100
commit3f7ba86255b9d6acd73dd39cc8f05d3d3f8741a9 (patch)
treeff50e42db861bf169562a5106f5525f247c19453 /src/tracking.c
parentdfe126f3e92c9770bef1915b6e64add2c41edcfa (diff)
downloadredis-3f7ba86255b9d6acd73dd39cc8f05d3d3f8741a9.tar.gz
Tracking: BCAST: registration in the prefix table.
Diffstat (limited to 'src/tracking.c')
-rw-r--r--src/tracking.c57
1 files changed, 54 insertions, 3 deletions
diff --git a/src/tracking.c b/src/tracking.c
index 413b21328..9f46275a4 100644
--- a/src/tracking.c
+++ b/src/tracking.c
@@ -49,6 +49,15 @@ uint64_t TrackingTableTotalItems = 0; /* Total number of IDs stored across
are using server side for CSC. */
robj *TrackingChannelName;
+/* This is the structure that we have as value of the PrefixTable, and
+ * represents the list of keys modified, and the list of clients that need
+ * to be notified, for a given prefix. */
+typedef struct bcastState {
+ rax *keys; /* Keys modified in the current event loop cycle. */
+ rax *clients; /* Clients subscribed to the notification events for this
+ prefix. */
+} bcastState;
+
/* Remove the tracking state from the client 'c'. Note that there is not much
* to do for us here, if not to decrement the counter of the clients in
* tracking mode, because we just store the ID of the client in the tracking
@@ -56,9 +65,51 @@ robj *TrackingChannelName;
* client with many entries in the table is removed, it would cost a lot of
* time to do the cleanup. */
void disableTracking(client *c) {
+ /* If this client is in broadcasting mode, we need to unsubscribe it
+ * from all the prefixes it is registered to. */
+ if (c->flags & CLIENT_TRACKING_BCAST) {
+ raxIterator ri;
+ raxStart(&ri,c->client_tracking_prefixes);
+ raxSeek(&ri,"^",NULL,0);
+ while(raxNext(&ri)) {
+ bcastState *bs = raxFind(PrefixTable,ri.key,ri.key_len);
+ serverAssert(bs != raxNotFound);
+ raxRemove(bs->clients,(unsigned char*)&c,sizeof(c),NULL);
+ /* Was it the last client? Remove the prefix from the
+ * table. */
+ if (raxSize(bs->clients) == 0) {
+ raxFree(bs->clients);
+ raxFree(bs->keys);
+ zfree(bs);
+ raxRemove(PrefixTable,ri.key,ri.key_len,NULL);
+ }
+ }
+ raxStop(&ri);
+ }
+
+ /* Clear flags and adjust the count. */
if (c->flags & CLIENT_TRACKING) {
server.tracking_clients--;
- c->flags &= ~(CLIENT_TRACKING|CLIENT_TRACKING_BROKEN_REDIR);
+ c->flags &= ~(CLIENT_TRACKING|CLIENT_TRACKING_BROKEN_REDIR|
+ CLIENT_TRACKING_BCAST);
+ }
+}
+
+/* Set the client 'c' to track the prefix 'prefix'. If the client 'c' is
+ * already registered for the specified prefix, no operation is performed. */
+void enableBcastTrackingForPrefix(client *c, char *prefix, size_t plen) {
+ bcastState *bs = raxFind(PrefixTable,(unsigned char*)prefix,sdslen(prefix));
+ /* If this is the first client subscribing to such prefix, create
+ * the prefix in the table. */
+ if (bs == raxNotFound) {
+ bs = zmalloc(sizeof(*bs));
+ bs->keys = raxNew();
+ bs->clients = raxNew();
+ raxInsert(PrefixTable,(unsigned char*)prefix,plen,bs,NULL);
+ }
+ if (raxTryInsert(bs->clients,(unsigned char*)&c,sizeof(c),NULL,NULL)) {
+ raxInsert(c->client_tracking_prefixes,
+ (unsigned char*)prefix,plen,NULL,NULL);
}
}
@@ -83,9 +134,9 @@ void enableTracking(client *c, uint64_t redirect_to, int bcast, robj **prefix, s
if (bcast) {
c->flags |= CLIENT_TRACKING_BCAST;
if (numprefix == 0) enableBcastTrackingForPrefix(c,"",0);
- for (int j = 0; j < numprefix; j++) {
+ for (size_t j = 0; j < numprefix; j++) {
sds sdsprefix = prefix[j]->ptr;
- enableBcastTrackingForPrefix(c,sdsprefix,sdslen(prefix));
+ enableBcastTrackingForPrefix(c,sdsprefix,sdslen(sdsprefix));
}
}
}