summaryrefslogtreecommitdiff
path: root/src/tracking.c
diff options
context:
space:
mode:
authorantirez <antirez@gmail.com>2020-02-12 19:22:04 +0100
committerantirez <antirez@gmail.com>2020-02-12 19:22:04 +0100
commit40194a2a6809520b5f01da4a7b41afe2a2441f64 (patch)
treeeb7ae44d4225f2b3f0e7ea774710246251b7feb6 /src/tracking.c
parent71f3f3f1afe4fbb6f8634970258b5dec2d389c68 (diff)
downloadredis-40194a2a6809520b5f01da4a7b41afe2a2441f64.tar.gz
Tracking: BCAST: basic feature now works.
Diffstat (limited to 'src/tracking.c')
-rw-r--r--src/tracking.c89
1 files changed, 50 insertions, 39 deletions
diff --git a/src/tracking.c b/src/tracking.c
index 345c5f1ad..672b886a3 100644
--- a/src/tracking.c
+++ b/src/tracking.c
@@ -85,6 +85,8 @@ void disableTracking(client *c) {
}
}
raxStop(&ri);
+ raxFree(c->client_tracking_prefixes);
+ c->client_tracking_prefixes = NULL;
}
/* Clear flags and adjust the count. */
@@ -108,6 +110,8 @@ void enableBcastTrackingForPrefix(client *c, char *prefix, size_t plen) {
raxInsert(PrefixTable,(unsigned char*)prefix,plen,bs,NULL);
}
if (raxTryInsert(bs->clients,(unsigned char*)&c,sizeof(c),NULL,NULL)) {
+ if (c->client_tracking_prefixes == NULL)
+ c->client_tracking_prefixes = raxNew();
raxInsert(c->client_tracking_prefixes,
(unsigned char*)prefix,plen,NULL,NULL);
}
@@ -121,10 +125,10 @@ void enableBcastTrackingForPrefix(client *c, char *prefix, size_t plen) {
* inform it of the condition. Multiple clients can redirect the invalidation
* messages to the same client ID. */
void enableTracking(client *c, uint64_t redirect_to, int bcast, robj **prefix, size_t numprefix) {
+ if (!(c->flags & CLIENT_TRACKING)) server.tracking_clients++;
c->flags |= CLIENT_TRACKING;
- c->flags &= ~CLIENT_TRACKING_BROKEN_REDIR;
+ c->flags &= ~(CLIENT_TRACKING_BROKEN_REDIR|CLIENT_TRACKING_BCAST);
c->client_tracking_redirection = redirect_to;
- if (!(c->flags & CLIENT_TRACKING)) server.tracking_clients++;
if (TrackingTable == NULL) {
TrackingTable = raxNew();
PrefixTable = raxNew();
@@ -229,10 +233,11 @@ void trackingRememberKeyToBroadcast(char *keyname, size_t keylen) {
raxStart(&ri,PrefixTable);
raxSeek(&ri,"^",NULL,0);
while(raxNext(&ri)) {
- if (keylen > ri.key_len) continue;
- if (memcmp(ri.key,keyname,ri.key_len) != 0) continue;
- bcastState *bs = ri.data;
- raxTryInsert(bs->keys,(unsigned char*)keyname,keylen,NULL,NULL);
+ if (ri.key_len > keylen) continue;
+ if (ri.key_len != 0 && memcmp(ri.key,keyname,ri.key_len) != 0)
+ continue;
+ bcastState *bs = ri.data;
+ raxTryInsert(bs->keys,(unsigned char*)keyname,keylen,NULL,NULL);
}
raxStop(&ri);
}
@@ -362,46 +367,52 @@ void trackingLimitUsedSlots(void) {
* notifications to each client in each prefix. */
void trackingBroadcastInvalidationMessages(void) {
raxIterator ri, ri2;
+
+ /* Return ASAP if there is nothing to do here. */
+ if (TrackingTable == NULL || !server.tracking_clients) return;
+
raxStart(&ri,PrefixTable);
raxSeek(&ri,"^",NULL,0);
while(raxNext(&ri)) {
bcastState *bs = ri.data;
- /* Create the array reply with the list of keys once, then send
- * it to all the clients subscribed to this prefix. */
- char buf[32];
- size_t len = ll2string(buf,sizeof(buf),raxSize(bs->keys));
- sds proto = sdsempty();
- proto = sdsMakeRoomFor(proto,raxSize(bs->keys)*15);
- proto = sdscatlen(proto,"*",1);
- proto = sdscatlen(proto,buf,len);
- proto = sdscatlen(proto,"\r\n",2);
- raxStart(&ri2,bs->keys);
- raxSeek(&ri2,"^",NULL,0);
- while(raxNext(&ri2)) {
- len = ll2string(buf,sizeof(buf),ri2.key_len);
- sds proto = sdsnewlen("$",1);
- proto = sdscatlen(proto,ri2.key,ri2.key_len);
+ if (raxSize(bs->keys)) {
+ /* Create the array reply with the list of keys once, then send
+ * it to all the clients subscribed to this prefix. */
+ char buf[32];
+ size_t len = ll2string(buf,sizeof(buf),raxSize(bs->keys));
+ sds proto = sdsempty();
+ proto = sdsMakeRoomFor(proto,raxSize(bs->keys)*15);
+ proto = sdscatlen(proto,"*",1);
+ proto = sdscatlen(proto,buf,len);
proto = sdscatlen(proto,"\r\n",2);
- }
- raxStop(&ri2);
-
- /* Send this array of keys to every client in the list. */
- raxStart(&ri2,bs->clients);
- raxSeek(&ri2,"^",NULL,0);
- while(raxNext(&ri2)) {
- client *c;
- memcpy(&c,ri2.key,sizeof(c));
- sendTrackingMessage(c,proto,sdslen(proto),1);
- }
- raxStop(&ri2);
+ raxStart(&ri2,bs->keys);
+ raxSeek(&ri2,"^",NULL,0);
+ while(raxNext(&ri2)) {
+ len = ll2string(buf,sizeof(buf),ri2.key_len);
+ proto = sdscatlen(proto,"$",1);
+ proto = sdscatlen(proto,buf,len);
+ proto = sdscatlen(proto,"\r\n",2);
+ proto = sdscatlen(proto,ri2.key,ri2.key_len);
+ proto = sdscatlen(proto,"\r\n",2);
+ }
+ raxStop(&ri2);
+
+ /* Send this array of keys to every client in the list. */
+ raxStart(&ri2,bs->clients);
+ raxSeek(&ri2,"^",NULL,0);
+ while(raxNext(&ri2)) {
+ client *c;
+ memcpy(&c,ri2.key,sizeof(c));
+ sendTrackingMessage(c,proto,sdslen(proto),1);
+ }
+ raxStop(&ri2);
- /* Clean up: we can remove everything from this state, because we
- * want to only track the new keys that will be accumulated starting
- * from now. */
- sdsfree(proto);
- raxFree(bs->clients);
+ /* Clean up: we can remove everything from this state, because we
+ * want to only track the new keys that will be accumulated starting
+ * from now. */
+ sdsfree(proto);
+ }
raxFree(bs->keys);
- bs->clients = raxNew();
bs->keys = raxNew();
}
raxStop(&ri);