summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorantirez <antirez@gmail.com>2020-02-11 17:26:27 +0100
committerantirez <antirez@gmail.com>2020-02-11 17:26:29 +0100
commit3f7ba86255b9d6acd73dd39cc8f05d3d3f8741a9 (patch)
treeff50e42db861bf169562a5106f5525f247c19453
parentdfe126f3e92c9770bef1915b6e64add2c41edcfa (diff)
downloadredis-3f7ba86255b9d6acd73dd39cc8f05d3d3f8741a9.tar.gz
Tracking: BCAST: registration in the prefix table.
-rw-r--r--src/networking.c21
-rw-r--r--src/server.h9
-rw-r--r--src/tracking.c57
3 files changed, 67 insertions, 20 deletions
diff --git a/src/networking.c b/src/networking.c
index 690a134f1..344b76260 100644
--- a/src/networking.c
+++ b/src/networking.c
@@ -154,7 +154,7 @@ client *createClient(connection *conn) {
c->peerid = NULL;
c->client_list_node = NULL;
c->client_tracking_redirection = 0;
- c->client_tracking_prefix_nodes = NULL;
+ c->client_tracking_prefixes = NULL;
c->auth_callback = NULL;
c->auth_callback_privdata = NULL;
c->auth_module = NULL;
@@ -2028,7 +2028,6 @@ int clientSetNameOrReply(client *c, robj *name) {
void clientCommand(client *c) {
listNode *ln;
listIter li;
- client *client;
if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"help")) {
const char *help[] = {
@@ -2142,7 +2141,7 @@ NULL
/* Iterate clients killing all the matching clients. */
listRewind(server.clients,&li);
while ((ln = listNext(&li)) != NULL) {
- client = listNodeValue(ln);
+ client *client = listNodeValue(ln);
if (addr && strcmp(getClientPeerId(client),addr) != 0) continue;
if (type != -1 && getClientType(client) != type) continue;
if (id != 0 && client->id != id) continue;
@@ -2229,7 +2228,7 @@ NULL
size_t numprefix = 0;
/* Parse the options. */
- if (for int j = 3; j < argc; j++) {
+ for (int j = 3; j < c->argc; j++) {
int moreargs = (c->argc-1) - j;
if (!strcasecmp(c->argv[j]->ptr,"redirect") && moreargs) {
@@ -2246,10 +2245,10 @@ NULL
}
} else if (!strcasecmp(c->argv[j]->ptr,"bcast")) {
bcast++;
- } else if (!strcasecmp(c->argv[j]->ptr,"prefix") && morearg) {
+ } else if (!strcasecmp(c->argv[j]->ptr,"prefix") && moreargs) {
j++;
- prefix = zrealloc(sizeof(robj*)*(numprefix+1));
- prefix[numprefix++] = argv[j];
+ prefix = zrealloc(prefix,sizeof(robj*)*(numprefix+1));
+ prefix[numprefix++] = c->argv[j];
} else {
addReply(c,shared.syntaxerr);
return;
@@ -2259,16 +2258,16 @@ NULL
/* Make sure options are compatible among each other and with the
* current state of the client. */
if (!bcast && numprefix) {
- addReplyError("PREFIX option requires BCAST mode to be enabled");
+ addReplyError(c,"PREFIX option requires BCAST mode to be enabled");
zfree(prefix);
return;
}
- if (client->flags & CLIENT_TRACKING) {
- int oldbcast = !!client->flags & CLIENT_TRACKING_BCAST;
+ if (c->flags & CLIENT_TRACKING) {
+ int oldbcast = !!c->flags & CLIENT_TRACKING_BCAST;
if (oldbcast != bcast) {
}
- addReplyError(
+ addReplyError(c,
"You can't switch BCAST mode on/off before disabling "
"tracking for this client, and then re-enabling it with "
"a different mode.");
diff --git a/src/server.h b/src/server.h
index d3ca0d01b..725c3cbc8 100644
--- a/src/server.h
+++ b/src/server.h
@@ -823,12 +823,9 @@ typedef struct client {
* invalidation messages for keys fetched by this client will be send to
* the specified client ID. */
uint64_t client_tracking_redirection;
- list *client_tracking_prefix_nodes; /* This list contains listNode pointers
- to the nodes we have in every list
- of clients in the tracking bcast
- table. This way we can remove our
- client in O(1) for each list. */
-
+ rax *client_tracking_prefixes; /* A dictionary of prefixes we are already
+ subscribed to in BCAST mode, in the
+ context of client side caching. */
/* Response buffer */
int bufpos;
char buf[PROTO_REPLY_CHUNK_BYTES];
diff --git a/src/tracking.c b/src/tracking.c
index 413b21328..9f46275a4 100644
--- a/src/tracking.c
+++ b/src/tracking.c
@@ -49,6 +49,15 @@ uint64_t TrackingTableTotalItems = 0; /* Total number of IDs stored across
are using server side for CSC. */
robj *TrackingChannelName;
+/* This is the structure that we have as value of the PrefixTable, and
+ * represents the list of keys modified, and the list of clients that need
+ * to be notified, for a given prefix. */
+typedef struct bcastState {
+ rax *keys; /* Keys modified in the current event loop cycle. */
+ rax *clients; /* Clients subscribed to the notification events for this
+ prefix. */
+} bcastState;
+
/* Remove the tracking state from the client 'c'. Note that there is not much
* to do for us here, if not to decrement the counter of the clients in
* tracking mode, because we just store the ID of the client in the tracking
@@ -56,9 +65,51 @@ robj *TrackingChannelName;
* client with many entries in the table is removed, it would cost a lot of
* time to do the cleanup. */
void disableTracking(client *c) {
+ /* If this client is in broadcasting mode, we need to unsubscribe it
+ * from all the prefixes it is registered to. */
+ if (c->flags & CLIENT_TRACKING_BCAST) {
+ raxIterator ri;
+ raxStart(&ri,c->client_tracking_prefixes);
+ raxSeek(&ri,"^",NULL,0);
+ while(raxNext(&ri)) {
+ bcastState *bs = raxFind(PrefixTable,ri.key,ri.key_len);
+ serverAssert(bs != raxNotFound);
+ raxRemove(bs->clients,(unsigned char*)&c,sizeof(c),NULL);
+ /* Was it the last client? Remove the prefix from the
+ * table. */
+ if (raxSize(bs->clients) == 0) {
+ raxFree(bs->clients);
+ raxFree(bs->keys);
+ zfree(bs);
+ raxRemove(PrefixTable,ri.key,ri.key_len,NULL);
+ }
+ }
+ raxStop(&ri);
+ }
+
+ /* Clear flags and adjust the count. */
if (c->flags & CLIENT_TRACKING) {
server.tracking_clients--;
- c->flags &= ~(CLIENT_TRACKING|CLIENT_TRACKING_BROKEN_REDIR);
+ c->flags &= ~(CLIENT_TRACKING|CLIENT_TRACKING_BROKEN_REDIR|
+ CLIENT_TRACKING_BCAST);
+ }
+}
+
+/* Set the client 'c' to track the prefix 'prefix'. If the client 'c' is
+ * already registered for the specified prefix, no operation is performed. */
+void enableBcastTrackingForPrefix(client *c, char *prefix, size_t plen) {
+ bcastState *bs = raxFind(PrefixTable,(unsigned char*)prefix,sdslen(prefix));
+ /* If this is the first client subscribing to such prefix, create
+ * the prefix in the table. */
+ if (bs == raxNotFound) {
+ bs = zmalloc(sizeof(*bs));
+ bs->keys = raxNew();
+ bs->clients = raxNew();
+ raxInsert(PrefixTable,(unsigned char*)prefix,plen,bs,NULL);
+ }
+ if (raxTryInsert(bs->clients,(unsigned char*)&c,sizeof(c),NULL,NULL)) {
+ raxInsert(c->client_tracking_prefixes,
+ (unsigned char*)prefix,plen,NULL,NULL);
}
}
@@ -83,9 +134,9 @@ void enableTracking(client *c, uint64_t redirect_to, int bcast, robj **prefix, s
if (bcast) {
c->flags |= CLIENT_TRACKING_BCAST;
if (numprefix == 0) enableBcastTrackingForPrefix(c,"",0);
- for (int j = 0; j < numprefix; j++) {
+ for (size_t j = 0; j < numprefix; j++) {
sds sdsprefix = prefix[j]->ptr;
- enableBcastTrackingForPrefix(c,sdsprefix,sdslen(prefix));
+ enableBcastTrackingForPrefix(c,sdsprefix,sdslen(sdsprefix));
}
}
}