summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/Makefile2
-rw-r--r--src/db.c1
-rw-r--r--src/debug.c2
-rw-r--r--src/expire.c1
-rw-r--r--src/networking.c47
-rw-r--r--src/server.c11
-rw-r--r--src/server.h28
-rw-r--r--src/tracking.c162
8 files changed, 246 insertions, 8 deletions
diff --git a/src/Makefile b/src/Makefile
index f35685eff..e608309f8 100644
--- a/src/Makefile
+++ b/src/Makefile
@@ -164,7 +164,7 @@ endif
REDIS_SERVER_NAME=redis-server
REDIS_SENTINEL_NAME=redis-sentinel
-REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o acl.o gopher.o
+REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o acl.o gopher.o tracking.c
REDIS_CLI_NAME=redis-cli
REDIS_CLI_OBJ=anet.o adlist.o dict.o redis-cli.o zmalloc.o release.o anet.o ae.o crc64.o siphash.o crc16.o
REDIS_BENCHMARK_NAME=redis-benchmark
diff --git a/src/db.c b/src/db.c
index b537a29a4..4977873e9 100644
--- a/src/db.c
+++ b/src/db.c
@@ -399,6 +399,7 @@ int selectDb(client *c, int id) {
void signalModifiedKey(redisDb *db, robj *key) {
touchWatchedKey(db,key);
+ if (server.tracking_clients) trackingInvalidateKey(key);
}
void signalFlushedDb(int dbid) {
diff --git a/src/debug.c b/src/debug.c
index 0c6b5630c..1f1157d4a 100644
--- a/src/debug.c
+++ b/src/debug.c
@@ -702,7 +702,7 @@ void _serverAssertPrintClientInfo(const client *c) {
bugReportStart();
serverLog(LL_WARNING,"=== ASSERTION FAILED CLIENT CONTEXT ===");
- serverLog(LL_WARNING,"client->flags = %d", c->flags);
+ serverLog(LL_WARNING,"client->flags = %llu", (unsigned long long)c->flags);
serverLog(LL_WARNING,"client->fd = %d", c->fd);
serverLog(LL_WARNING,"client->argc = %d", c->argc);
for (j=0; j < c->argc; j++) {
diff --git a/src/expire.c b/src/expire.c
index 0b92ee3fe..b23117a3c 100644
--- a/src/expire.c
+++ b/src/expire.c
@@ -64,6 +64,7 @@ int activeExpireCycleTryExpire(redisDb *db, dictEntry *de, long long now) {
dbSyncDelete(db,keyobj);
notifyKeyspaceEvent(NOTIFY_EXPIRED,
"expired",keyobj,db->id);
+ if (server.tracking_clients) trackingInvalidateKey(keyobj);
decrRefCount(keyobj);
server.stat_expiredkeys++;
return 1;
diff --git a/src/networking.c b/src/networking.c
index 4bc22120a..716b35859 100644
--- a/src/networking.c
+++ b/src/networking.c
@@ -158,6 +158,7 @@ client *createClient(int fd) {
c->pubsub_patterns = listCreate();
c->peerid = NULL;
c->client_list_node = NULL;
+ c->client_tracking_redirection = 0;
listSetFreeMethod(c->pubsub_patterns,decrRefCountVoid);
listSetMatchMethod(c->pubsub_patterns,listMatchObjects);
if (fd != -1) linkClient(c);
@@ -966,6 +967,9 @@ void unlinkClient(client *c) {
listDelNode(server.unblocked_clients,ln);
c->flags &= ~CLIENT_UNBLOCKED;
}
+
+ /* Clear the tracking status. */
+ if (c->flags & CLIENT_TRACKING) disableTracking(c);
}
void freeClient(client *c) {
@@ -1849,6 +1853,8 @@ sds catClientInfoString(sds s, client *client) {
if (client->flags & CLIENT_PUBSUB) *p++ = 'P';
if (client->flags & CLIENT_MULTI) *p++ = 'x';
if (client->flags & CLIENT_BLOCKED) *p++ = 'b';
+ if (client->flags & CLIENT_TRACKING) *p++ = 't';
+ if (client->flags & CLIENT_TRACKING_BROKEN_REDIR) *p++ = 'R';
if (client->flags & CLIENT_DIRTY_CAS) *p++ = 'd';
if (client->flags & CLIENT_CLOSE_AFTER_REPLY) *p++ = 'c';
if (client->flags & CLIENT_UNBLOCKED) *p++ = 'u';
@@ -1961,6 +1967,7 @@ void clientCommand(client *c) {
"reply (on|off|skip) -- Control the replies sent to the current connection.",
"setname <name> -- Assign the name <name> to the current connection.",
"unblock <clientid> [TIMEOUT|ERROR] -- Unblock the specified blocked client.",
+"tracking (on|off) [REDIRECT <id>] -- Enable client keys tracking for client side caching.",
NULL
};
addReplyHelp(c, help);
@@ -2117,20 +2124,56 @@ NULL
addReply(c,shared.czero);
}
} else if (!strcasecmp(c->argv[1]->ptr,"setname") && c->argc == 3) {
+ /* CLIENT SETNAME */
if (clientSetNameOrReply(c,c->argv[2]) == C_OK)
addReply(c,shared.ok);
} else if (!strcasecmp(c->argv[1]->ptr,"getname") && c->argc == 2) {
+ /* CLIENT GETNAME */
if (c->name)
addReplyBulk(c,c->name);
else
addReplyNull(c);
} else if (!strcasecmp(c->argv[1]->ptr,"pause") && c->argc == 3) {
+ /* CLIENT PAUSE */
long long duration;
- if (getTimeoutFromObjectOrReply(c,c->argv[2],&duration,UNIT_MILLISECONDS)
- != C_OK) return;
+ if (getTimeoutFromObjectOrReply(c,c->argv[2],&duration,
+ UNIT_MILLISECONDS) != C_OK) return;
pauseClients(duration);
addReply(c,shared.ok);
+ } else if (!strcasecmp(c->argv[1]->ptr,"tracking") &&
+ (c->argc == 3 || c->argc == 5))
+ {
+ /* CLIENT TRACKING (on|off) [REDIRECT <id>] */
+ long long redir = 0;
+
+ /* Parse the redirection option: we'll require the client with
+ * the specified ID to exist right now, even if it is possible
+ * it will get disconnected later. */
+ if (c->argc == 5) {
+ if (strcasecmp(c->argv[3]->ptr,"redirect") != 0) {
+ addReply(c,shared.syntaxerr);
+ return;
+ } else {
+ if (getLongLongFromObjectOrReply(c,c->argv[4],&redir,NULL) !=
+ C_OK) return;
+ if (lookupClientByID(redir) == NULL) {
+ addReplyError(c,"The client ID you want redirect to "
+ "does not exist");
+ return;
+ }
+ }
+ }
+
+ if (!strcasecmp(c->argv[2]->ptr,"on")) {
+ enableTracking(c,redir);
+ } else if (!strcasecmp(c->argv[2]->ptr,"off")) {
+ disableTracking(c);
+ } else {
+ addReply(c,shared.syntaxerr);
+ return;
+ }
+ addReply(c,shared.ok);
} else {
addReplyErrorFormat(c, "Unknown subcommand or wrong number of arguments for '%s'. Try CLIENT HELP", (char*)c->argv[1]->ptr);
}
diff --git a/src/server.c b/src/server.c
index e1d48e596..78b8d8f1b 100644
--- a/src/server.c
+++ b/src/server.c
@@ -3194,6 +3194,7 @@ void call(client *c, int flags) {
latencyAddSampleIfNeeded(latency_event,duration/1000);
slowlogPushEntryIfNeeded(c,c->argv,c->argc,duration);
}
+
if (flags & CMD_CALL_STATS) {
/* use the real command that was executed (cmd and lastamc) may be
* different, in case of MULTI-EXEC or re-written commands such as
@@ -3261,6 +3262,16 @@ void call(client *c, int flags) {
redisOpArrayFree(&server.also_propagate);
}
server.also_propagate = prev_also_propagate;
+
+ /* If the client has keys tracking enabled for client side caching,
+ * make sure to remember the keys it fetched via this command. */
+ if (c->cmd->flags & CMD_READONLY) {
+ client *caller = (c->flags & CLIENT_LUA && server.lua_caller) ?
+ server.lua_caller : c;
+ if (caller->flags & CLIENT_TRACKING)
+ trackingRememberKeys(caller);
+ }
+
server.stat_numcommands++;
}
diff --git a/src/server.h b/src/server.h
index 0813f8bd1..cd6652257 100644
--- a/src/server.h
+++ b/src/server.h
@@ -254,8 +254,8 @@ typedef long long mstime_t; /* millisecond time type. */
#define AOF_WAIT_REWRITE 2 /* AOF waits rewrite to start appending */
/* Client flags */
-#define CLIENT_SLAVE (1<<0) /* This client is a slave server */
-#define CLIENT_MASTER (1<<1) /* This client is a master server */
+#define CLIENT_SLAVE (1<<0) /* This client is a repliaca */
+#define CLIENT_MASTER (1<<1) /* This client is a master */
#define CLIENT_MONITOR (1<<2) /* This client is a slave monitor, see MONITOR */
#define CLIENT_MULTI (1<<3) /* This client is in a MULTI context */
#define CLIENT_BLOCKED (1<<4) /* The client is waiting in a blocking operation */
@@ -289,7 +289,13 @@ typedef long long mstime_t; /* millisecond time type. */
#define CLIENT_PENDING_READ (1<<29) /* The client has pending reads and was put
in the list of clients we can read
from. */
-#define CLIENT_PENDING_COMMAND (1<<30) /* */
+#define CLIENT_PENDING_COMMAND (1<<30) /* Used in threaded I/O to signal after
+ we return single threaded that the
+ client has already pending commands
+ to be executed. */
+#define CLIENT_TRACKING (1<<31) /* Client enabled keys tracking in order to
+ perform client side caching. */
+#define CLIENT_TRACKING_BROKEN_REDIR (1ULL<<32) /* Target client is invalid. */
/* Client block type (btype field in client structure)
* if CLIENT_BLOCKED flag is set. */
@@ -816,7 +822,7 @@ typedef struct client {
time_t ctime; /* Client creation time. */
time_t lastinteraction; /* Time of the last interaction, used for timeout */
time_t obuf_soft_limit_reached_time;
- int flags; /* Client flags: CLIENT_* macros. */
+ uint64_t flags; /* Client flags: CLIENT_* macros. */
int authenticated; /* Needed when the default user requires auth. */
int replstate; /* Replication state if this is a slave. */
int repl_put_online_on_ack; /* Install slave write handler on ACK. */
@@ -845,6 +851,11 @@ typedef struct client {
sds peerid; /* Cached peer ID. */
listNode *client_list_node; /* list node in client list */
+ /* If this client is in tracking mode and this field is non zero,
+ * invalidation messages for keys fetched by this client will be send to
+ * the specified client ID. */
+ uint64_t client_tracking_redirection;
+
/* Response buffer */
int bufpos;
char buf[PROTO_REPLY_CHUNK_BYTES];
@@ -1286,6 +1297,8 @@ struct redisServer {
unsigned int blocked_clients_by_type[BLOCKED_NUM];
list *unblocked_clients; /* list of clients to unblock before next loop */
list *ready_keys; /* List of readyList structures for BLPOP & co */
+ /* Client side caching. */
+ unsigned int tracking_clients; /* # of clients with tracking enabled.*/
/* Sort parameters - qsort_r() is only available under BSD so we
* have to take this state global, in order to pass it to sortCompare() */
int sort_desc;
@@ -1591,6 +1604,7 @@ void linkClient(client *c);
void protectClient(client *c);
void unprotectClient(client *c);
void initThreadedIO(void);
+client *lookupClientByID(uint64_t id);
#ifdef __GNUC__
void addReplyErrorFormat(client *c, const char *fmt, ...)
@@ -1602,6 +1616,12 @@ void addReplyErrorFormat(client *c, const char *fmt, ...);
void addReplyStatusFormat(client *c, const char *fmt, ...);
#endif
+/* Client side caching (tracking mode) */
+void enableTracking(client *c, uint64_t redirect_to);
+void disableTracking(client *c);
+void trackingRememberKeys(client *c);
+void trackingInvalidateKey(robj *keyobj);
+
/* List data type */
void listTypeTryConversion(robj *subject, robj *value);
void listTypePush(robj *subject, robj *value, int where);
diff --git a/src/tracking.c b/src/tracking.c
new file mode 100644
index 000000000..aade137c4
--- /dev/null
+++ b/src/tracking.c
@@ -0,0 +1,162 @@
+/* tracking.c - Client side caching: keys tracking and invalidation
+ *
+ * Copyright (c) 2019, Salvatore Sanfilippo <antirez at gmail dot com>
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * * Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ * * Neither the name of Redis nor the names of its contributors may be used
+ * to endorse or promote products derived from this software without
+ * specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "server.h"
+
+/* The tracking table is constituted by 2^24 radix trees (each tree, and the
+ * table itself, are allocated in a lazy way only when needed) tracking
+ * clients that may have certain keys in their local, client side, cache.
+ *
+ * Keys are grouped into 2^24 slots, in a way similar to Redis Cluster hash
+ * slots, however here the function we use is crc64, taking the least
+ * significant 24 bits of the output.
+ *
+ * When a client enables tracking with "CLIENT TRACKING on", each key served to
+ * the client is hashed to one of such slots, and Redis will remember what
+ * client may have keys about such slot. Later, when a key in a given slot is
+ * modified, all the clients that may have local copies of keys in that slot
+ * will receive an invalidation message. There is no distinction of database
+ * number: a single table is used.
+ *
+ * Clients will normally take frequently requested objects in memory, removing
+ * them when invalidation messages are received. A strategy clients may use is
+ * to just cache objects in a dictionary, associating to each cached object
+ * some incremental epoch, or just a timestamp. When invalidation messages are
+ * received clients may store, in a different table, the timestamp (or epoch)
+ * of the invalidation of such given slot: later when accessing objects, the
+ * eviction of stale objects may be performed in a lazy way by checking if the
+ * cached object timestamp is older than the invalidation timestamp for such
+ * objects.
+ *
+ * The output of the 24 bit hash function is very large (more than 16 million
+ * possible slots), so clients that may want to use less resources may only
+ * use the most significant bits instead of the full 24 bits. */
+#define TRACKING_TABLE_SIZE (1<<24)
+rax **TrackingTable = NULL;
+
+/* Remove the tracking state from the client 'c'. Note that there is not much
+ * to do for us here, if not to decrement the counter of the clients in
+ * tracking mode, because we just store the ID of the client in the tracking
+ * table, so we'll remove the ID reference in a lazy way. Otherwise when a
+ * client with many entries in the table is removed, it would cost a lot of
+ * time to do the cleanup. */
+void disableTracking(client *c) {
+ if (c->flags & CLIENT_TRACKING) {
+ server.tracking_clients--;
+ c->flags &= ~(CLIENT_TRACKING|CLIENT_TRACKING_BROKEN_REDIR);
+ }
+}
+
+/* Enable the tracking state for the client 'c', and as a side effect allocates
+ * the tracking table if needed. If the 'redirect_to' argument is non zero, the
+ * invalidation messages for this client will be sent to the client ID
+ * specified by the 'redirect_to' argument. Note that if such client will
+ * eventually get freed, we'll send a message to the original client to
+ * inform it of the condition. Multiple clients can redirect the invalidation
+ * messages to the same client ID. */
+void enableTracking(client *c, uint64_t redirect_to) {
+ if (c->flags & CLIENT_TRACKING) return;
+ c->flags |= CLIENT_TRACKING;
+ c->flags &= ~CLIENT_TRACKING_BROKEN_REDIR;
+ c->client_tracking_redirection = redirect_to;
+ server.tracking_clients++;
+ if (TrackingTable == NULL)
+ TrackingTable = zcalloc(sizeof(rax*) * TRACKING_TABLE_SIZE);
+}
+
+/* This function is called after the excution of a readonly command in the
+ * case the client 'c' has keys tracking enabled. It will populate the
+ * tracking ivalidation table according to the keys the user fetched, so that
+ * Redis will know what are the clients that should receive an invalidation
+ * message with certain groups of keys are modified. */
+void trackingRememberKeys(client *c) {
+ int numkeys;
+ int *keys = getKeysFromCommand(c->cmd,c->argv,c->argc,&numkeys);
+ if (keys == NULL) return;
+
+ for(int j = 0; j < numkeys; j++) {
+ int idx = keys[j];
+ sds sdskey = c->argv[idx]->ptr;
+ uint64_t hash = crc64(0,
+ (unsigned char*)sdskey,sdslen(sdskey))&(TRACKING_TABLE_SIZE-1);
+ if (TrackingTable[hash] == NULL)
+ TrackingTable[hash] = raxNew();
+ raxTryInsert(TrackingTable[hash],
+ (unsigned char*)&c->id,sizeof(c->id),NULL,NULL);
+ }
+ getKeysFreeResult(keys);
+}
+
+/* This function is called from signalModifiedKey() or other places in Redis
+ * when a key changes value. In the context of keys tracking, our task here is
+ * to send a notification to every client that may have keys about such . */
+void trackingInvalidateKey(robj *keyobj) {
+ sds sdskey = keyobj->ptr;
+ uint64_t hash = crc64(0,
+ (unsigned char*)sdskey,sdslen(sdskey))&(TRACKING_TABLE_SIZE-1);
+ if (TrackingTable == NULL || TrackingTable[hash] == NULL) return;
+
+ raxIterator ri;
+ raxStart(&ri,TrackingTable[hash]);
+ raxSeek(&ri,"^",NULL,0);
+ while(raxNext(&ri)) {
+ uint64_t id;
+ memcpy(&id,ri.key,ri.key_len);
+ client *c = lookupClientByID(id);
+ if (c->client_tracking_redirection) {
+ client *redir = lookupClientByID(c->client_tracking_redirection);
+ if (!redir) {
+ /* We need to signal to the original connection that we
+ * are unable to send invalidation messages to the redirected
+ * connection, because the client no longer exist. */
+ if (c->resp > 2) {
+ addReplyPushLen(c,3);
+ addReplyBulkCBuffer(c,"tracking-redir-broken",21);
+ addReplyLongLong(c,c->client_tracking_redirection);
+ }
+ continue;
+ }
+ c = redir;
+ }
+
+ /* Only send such info for clients in RESP version 3 or more. */
+ if (c->resp > 2) {
+ addReplyPushLen(c,2);
+ addReplyBulkCBuffer(c,"invalidate",10);
+ addReplyLongLong(c,hash);
+ }
+ }
+ raxStop(&ri);
+
+ /* Free the tracking table: we'll create the radix tree and populate it
+ * again if more keys will be modified in this hash slot. */
+ raxFree(TrackingTable[hash]);
+ TrackingTable[hash] = NULL;
+}