summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorantirez <antirez@gmail.com>2020-02-10 17:18:11 +0100
committerantirez <antirez@gmail.com>2020-02-10 17:18:11 +0100
commitdfe126f3e92c9770bef1915b6e64add2c41edcfa (patch)
treeb6ccb8f3789a58d811bc979b405530a34f76b9fd /src
parentf53cc00c09a4e7c612b3781021246cbbeb533d7b (diff)
downloadredis-dfe126f3e92c9770bef1915b6e64add2c41edcfa.tar.gz
Tracking: BCAST: parsing of the options + skeleton.
Diffstat (limited to 'src')
-rw-r--r--src/networking.c63
-rw-r--r--src/server.c5
-rw-r--r--src/server.h8
-rw-r--r--src/tracking.c16
4 files changed, 73 insertions, 19 deletions
diff --git a/src/networking.c b/src/networking.c
index 2b0f7464a..690a134f1 100644
--- a/src/networking.c
+++ b/src/networking.c
@@ -154,6 +154,7 @@ client *createClient(connection *conn) {
c->peerid = NULL;
c->client_list_node = NULL;
c->client_tracking_redirection = 0;
+ c->client_tracking_prefix_nodes = NULL;
c->auth_callback = NULL;
c->auth_callback_privdata = NULL;
c->auth_module = NULL;
@@ -2219,38 +2220,72 @@ NULL
UNIT_MILLISECONDS) != C_OK) return;
pauseClients(duration);
addReply(c,shared.ok);
- } else if (!strcasecmp(c->argv[1]->ptr,"tracking") &&
- (c->argc == 3 || c->argc == 5))
- {
- /* CLIENT TRACKING (on|off) [REDIRECT <id>] */
+ } else if (!strcasecmp(c->argv[1]->ptr,"tracking") && c->argc >= 3) {
+ /* CLIENT TRACKING (on|off) [REDIRECT <id>] [BCAST] [PREFIX first]
+ * [PREFIX second] ... */
long long redir = 0;
+ int bcast = 0;
+ robj **prefix;
+ size_t numprefix = 0;
- /* Parse the redirection option: we'll require the client with
- * the specified ID to exist right now, even if it is possible
- * it will get disconnected later. */
- if (c->argc == 5) {
- if (strcasecmp(c->argv[3]->ptr,"redirect") != 0) {
- addReply(c,shared.syntaxerr);
- return;
- } else {
- if (getLongLongFromObjectOrReply(c,c->argv[4],&redir,NULL) !=
+ /* Parse the options. */
+ if (for int j = 3; j < argc; j++) {
+ int moreargs = (c->argc-1) - j;
+
+ if (!strcasecmp(c->argv[j]->ptr,"redirect") && moreargs) {
+ j++;
+ if (getLongLongFromObjectOrReply(c,c->argv[j],&redir,NULL) !=
C_OK) return;
+ /* We will require the client with the specified ID to exist
+ * right now, even if it is possible that it gets disconnected
+ * later. Still a valid sanity check. */
if (lookupClientByID(redir) == NULL) {
addReplyError(c,"The client ID you want redirect to "
"does not exist");
return;
}
+ } else if (!strcasecmp(c->argv[j]->ptr,"bcast")) {
+ bcast++;
+ } else if (!strcasecmp(c->argv[j]->ptr,"prefix") && morearg) {
+ j++;
+ prefix = zrealloc(sizeof(robj*)*(numprefix+1));
+ prefix[numprefix++] = argv[j];
+ } else {
+ addReply(c,shared.syntaxerr);
+ return;
}
}
+ /* Make sure options are compatible among each other and with the
+ * current state of the client. */
+ if (!bcast && numprefix) {
+ addReplyError("PREFIX option requires BCAST mode to be enabled");
+ zfree(prefix);
+ return;
+ }
+
+ if (client->flags & CLIENT_TRACKING) {
+ int oldbcast = !!client->flags & CLIENT_TRACKING_BCAST;
+ if (oldbcast != bcast) {
+ }
+ addReplyError(
+ "You can't switch BCAST mode on/off before disabling "
+ "tracking for this client, and then re-enabling it with "
+ "a different mode.");
+ zfree(prefix);
+ return;
+ }
+
+ /* Options are ok: enable or disable the tracking for this client. */
if (!strcasecmp(c->argv[2]->ptr,"on")) {
- enableTracking(c,redir);
+ enableTracking(c,redir,bcast,prefix,numprefix);
} else if (!strcasecmp(c->argv[2]->ptr,"off")) {
disableTracking(c);
} else {
addReply(c,shared.syntaxerr);
return;
}
+ zfree(prefix);
addReply(c,shared.ok);
} else if (!strcasecmp(c->argv[1]->ptr,"getredir") && c->argc == 2) {
/* CLIENT GETREDIR */
diff --git a/src/server.c b/src/server.c
index 910cf5410..1001fa4f7 100644
--- a/src/server.c
+++ b/src/server.c
@@ -3310,8 +3310,11 @@ void call(client *c, int flags) {
if (c->cmd->flags & CMD_READONLY) {
client *caller = (c->flags & CLIENT_LUA && server.lua_caller) ?
server.lua_caller : c;
- if (caller->flags & CLIENT_TRACKING)
+ if (caller->flags & CLIENT_TRACKING &&
+ !(caller->flags & CLIENT_TRACKING_BCAST))
+ {
trackingRememberKeys(caller);
+ }
}
server.fixed_time_expire--;
diff --git a/src/server.h b/src/server.h
index 3e055a7db..d3ca0d01b 100644
--- a/src/server.h
+++ b/src/server.h
@@ -247,6 +247,7 @@ typedef long long ustime_t; /* microsecond time type. */
#define CLIENT_TRACKING (1ULL<<31) /* Client enabled keys tracking in order to
perform client side caching. */
#define CLIENT_TRACKING_BROKEN_REDIR (1ULL<<32) /* Target client is invalid. */
+#define CLIENT_TRACKING_BCAST (1ULL<<33) /* Tracking in BCAST mode. */
/* Client block type (btype field in client structure)
* if CLIENT_BLOCKED flag is set. */
@@ -822,6 +823,11 @@ typedef struct client {
* invalidation messages for keys fetched by this client will be send to
* the specified client ID. */
uint64_t client_tracking_redirection;
+ list *client_tracking_prefix_nodes; /* This list contains listNode pointers
+ to the nodes we have in every list
+ of clients in the tracking bcast
+ table. This way we can remove our
+ client in O(1) for each list. */
/* Response buffer */
int bufpos;
@@ -1648,7 +1654,7 @@ void addReplyStatusFormat(client *c, const char *fmt, ...);
#endif
/* Client side caching (tracking mode) */
-void enableTracking(client *c, uint64_t redirect_to);
+void enableTracking(client *c, uint64_t redirect_to, int bcast, robj **prefix, size_t numprefix);
void disableTracking(client *c);
void trackingRememberKeys(client *c);
void trackingInvalidateKey(robj *keyobj);
diff --git a/src/tracking.c b/src/tracking.c
index 3122563ac..413b21328 100644
--- a/src/tracking.c
+++ b/src/tracking.c
@@ -42,6 +42,7 @@
* Clients will normally take frequently requested objects in memory, removing
* them when invalidation messages are received. */
rax *TrackingTable = NULL;
+rax *PrefixTable = NULL;
uint64_t TrackingTableTotalItems = 0; /* Total number of IDs stored across
the whole tracking table. This givesn
an hint about the total memory we
@@ -68,16 +69,25 @@ void disableTracking(client *c) {
* eventually get freed, we'll send a message to the original client to
* inform it of the condition. Multiple clients can redirect the invalidation
* messages to the same client ID. */
-void enableTracking(client *c, uint64_t redirect_to) {
- if (c->flags & CLIENT_TRACKING) return;
+void enableTracking(client *c, uint64_t redirect_to, int bcast, robj **prefix, size_t numprefix) {
c->flags |= CLIENT_TRACKING;
c->flags &= ~CLIENT_TRACKING_BROKEN_REDIR;
c->client_tracking_redirection = redirect_to;
- server.tracking_clients++;
+ if (!(c->flags & CLIENT_TRACKING)) server.tracking_clients++;
if (TrackingTable == NULL) {
TrackingTable = raxNew();
+ PrefixTable = raxNew();
TrackingChannelName = createStringObject("__redis__:invalidate",20);
}
+
+ if (bcast) {
+ c->flags |= CLIENT_TRACKING_BCAST;
+ if (numprefix == 0) enableBcastTrackingForPrefix(c,"",0);
+ for (int j = 0; j < numprefix; j++) {
+ sds sdsprefix = prefix[j]->ptr;
+ enableBcastTrackingForPrefix(c,sdsprefix,sdslen(prefix));
+ }
+ }
}
/* This function is called after the excution of a readonly command in the