From dfe126f3e92c9770bef1915b6e64add2c41edcfa Mon Sep 17 00:00:00 2001 From: antirez Date: Mon, 10 Feb 2020 17:18:11 +0100 Subject: Tracking: BCAST: parsing of the options + skeleton. --- src/networking.c | 63 +++++++++++++++++++++++++++++++++++++++++++------------- src/server.c | 5 ++++- src/server.h | 8 ++++++- src/tracking.c | 16 +++++++++++--- 4 files changed, 73 insertions(+), 19 deletions(-) (limited to 'src') diff --git a/src/networking.c b/src/networking.c index 2b0f7464a..690a134f1 100644 --- a/src/networking.c +++ b/src/networking.c @@ -154,6 +154,7 @@ client *createClient(connection *conn) { c->peerid = NULL; c->client_list_node = NULL; c->client_tracking_redirection = 0; + c->client_tracking_prefix_nodes = NULL; c->auth_callback = NULL; c->auth_callback_privdata = NULL; c->auth_module = NULL; @@ -2219,38 +2220,72 @@ NULL UNIT_MILLISECONDS) != C_OK) return; pauseClients(duration); addReply(c,shared.ok); - } else if (!strcasecmp(c->argv[1]->ptr,"tracking") && - (c->argc == 3 || c->argc == 5)) - { - /* CLIENT TRACKING (on|off) [REDIRECT ] */ + } else if (!strcasecmp(c->argv[1]->ptr,"tracking") && c->argc >= 3) { + /* CLIENT TRACKING (on|off) [REDIRECT ] [BCAST] [PREFIX first] + * [PREFIX second] ... */ long long redir = 0; + int bcast = 0; + robj **prefix; + size_t numprefix = 0; - /* Parse the redirection option: we'll require the client with - * the specified ID to exist right now, even if it is possible - * it will get disconnected later. */ - if (c->argc == 5) { - if (strcasecmp(c->argv[3]->ptr,"redirect") != 0) { - addReply(c,shared.syntaxerr); - return; - } else { - if (getLongLongFromObjectOrReply(c,c->argv[4],&redir,NULL) != + /* Parse the options. */ + if (for int j = 3; j < argc; j++) { + int moreargs = (c->argc-1) - j; + + if (!strcasecmp(c->argv[j]->ptr,"redirect") && moreargs) { + j++; + if (getLongLongFromObjectOrReply(c,c->argv[j],&redir,NULL) != C_OK) return; + /* We will require the client with the specified ID to exist + * right now, even if it is possible that it gets disconnected + * later. Still a valid sanity check. */ if (lookupClientByID(redir) == NULL) { addReplyError(c,"The client ID you want redirect to " "does not exist"); return; } + } else if (!strcasecmp(c->argv[j]->ptr,"bcast")) { + bcast++; + } else if (!strcasecmp(c->argv[j]->ptr,"prefix") && morearg) { + j++; + prefix = zrealloc(sizeof(robj*)*(numprefix+1)); + prefix[numprefix++] = argv[j]; + } else { + addReply(c,shared.syntaxerr); + return; } } + /* Make sure options are compatible among each other and with the + * current state of the client. */ + if (!bcast && numprefix) { + addReplyError("PREFIX option requires BCAST mode to be enabled"); + zfree(prefix); + return; + } + + if (client->flags & CLIENT_TRACKING) { + int oldbcast = !!client->flags & CLIENT_TRACKING_BCAST; + if (oldbcast != bcast) { + } + addReplyError( + "You can't switch BCAST mode on/off before disabling " + "tracking for this client, and then re-enabling it with " + "a different mode."); + zfree(prefix); + return; + } + + /* Options are ok: enable or disable the tracking for this client. */ if (!strcasecmp(c->argv[2]->ptr,"on")) { - enableTracking(c,redir); + enableTracking(c,redir,bcast,prefix,numprefix); } else if (!strcasecmp(c->argv[2]->ptr,"off")) { disableTracking(c); } else { addReply(c,shared.syntaxerr); return; } + zfree(prefix); addReply(c,shared.ok); } else if (!strcasecmp(c->argv[1]->ptr,"getredir") && c->argc == 2) { /* CLIENT GETREDIR */ diff --git a/src/server.c b/src/server.c index 910cf5410..1001fa4f7 100644 --- a/src/server.c +++ b/src/server.c @@ -3310,8 +3310,11 @@ void call(client *c, int flags) { if (c->cmd->flags & CMD_READONLY) { client *caller = (c->flags & CLIENT_LUA && server.lua_caller) ? server.lua_caller : c; - if (caller->flags & CLIENT_TRACKING) + if (caller->flags & CLIENT_TRACKING && + !(caller->flags & CLIENT_TRACKING_BCAST)) + { trackingRememberKeys(caller); + } } server.fixed_time_expire--; diff --git a/src/server.h b/src/server.h index 3e055a7db..d3ca0d01b 100644 --- a/src/server.h +++ b/src/server.h @@ -247,6 +247,7 @@ typedef long long ustime_t; /* microsecond time type. */ #define CLIENT_TRACKING (1ULL<<31) /* Client enabled keys tracking in order to perform client side caching. */ #define CLIENT_TRACKING_BROKEN_REDIR (1ULL<<32) /* Target client is invalid. */ +#define CLIENT_TRACKING_BCAST (1ULL<<33) /* Tracking in BCAST mode. */ /* Client block type (btype field in client structure) * if CLIENT_BLOCKED flag is set. */ @@ -822,6 +823,11 @@ typedef struct client { * invalidation messages for keys fetched by this client will be send to * the specified client ID. */ uint64_t client_tracking_redirection; + list *client_tracking_prefix_nodes; /* This list contains listNode pointers + to the nodes we have in every list + of clients in the tracking bcast + table. This way we can remove our + client in O(1) for each list. */ /* Response buffer */ int bufpos; @@ -1648,7 +1654,7 @@ void addReplyStatusFormat(client *c, const char *fmt, ...); #endif /* Client side caching (tracking mode) */ -void enableTracking(client *c, uint64_t redirect_to); +void enableTracking(client *c, uint64_t redirect_to, int bcast, robj **prefix, size_t numprefix); void disableTracking(client *c); void trackingRememberKeys(client *c); void trackingInvalidateKey(robj *keyobj); diff --git a/src/tracking.c b/src/tracking.c index 3122563ac..413b21328 100644 --- a/src/tracking.c +++ b/src/tracking.c @@ -42,6 +42,7 @@ * Clients will normally take frequently requested objects in memory, removing * them when invalidation messages are received. */ rax *TrackingTable = NULL; +rax *PrefixTable = NULL; uint64_t TrackingTableTotalItems = 0; /* Total number of IDs stored across the whole tracking table. This givesn an hint about the total memory we @@ -68,16 +69,25 @@ void disableTracking(client *c) { * eventually get freed, we'll send a message to the original client to * inform it of the condition. Multiple clients can redirect the invalidation * messages to the same client ID. */ -void enableTracking(client *c, uint64_t redirect_to) { - if (c->flags & CLIENT_TRACKING) return; +void enableTracking(client *c, uint64_t redirect_to, int bcast, robj **prefix, size_t numprefix) { c->flags |= CLIENT_TRACKING; c->flags &= ~CLIENT_TRACKING_BROKEN_REDIR; c->client_tracking_redirection = redirect_to; - server.tracking_clients++; + if (!(c->flags & CLIENT_TRACKING)) server.tracking_clients++; if (TrackingTable == NULL) { TrackingTable = raxNew(); + PrefixTable = raxNew(); TrackingChannelName = createStringObject("__redis__:invalidate",20); } + + if (bcast) { + c->flags |= CLIENT_TRACKING_BCAST; + if (numprefix == 0) enableBcastTrackingForPrefix(c,"",0); + for (int j = 0; j < numprefix; j++) { + sds sdsprefix = prefix[j]->ptr; + enableBcastTrackingForPrefix(c,sdsprefix,sdslen(prefix)); + } + } } /* This function is called after the excution of a readonly command in the -- cgit v1.2.1