summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/db.c1
-rw-r--r--src/server.h1
-rw-r--r--src/tracking.c81
3 files changed, 52 insertions, 31 deletions
diff --git a/src/db.c b/src/db.c
index 51f5a12b4..568e0b8de 100644
--- a/src/db.c
+++ b/src/db.c
@@ -417,6 +417,7 @@ void signalModifiedKey(redisDb *db, robj *key) {
void signalFlushedDb(int dbid) {
touchWatchedKeysOnFlush(dbid);
+ if (server.tracking_clients) trackingInvalidateKeysOnFlush(dbid);
}
/*-----------------------------------------------------------------------------
diff --git a/src/server.h b/src/server.h
index f81b1010e..b200a6696 100644
--- a/src/server.h
+++ b/src/server.h
@@ -1638,6 +1638,7 @@ void enableTracking(client *c, uint64_t redirect_to);
void disableTracking(client *c);
void trackingRememberKeys(client *c);
void trackingInvalidateKey(robj *keyobj);
+void trackingInvalidateKeysOnFlush(int dbid);
/* List data type */
void listTypeTryConversion(robj *subject, robj *value);
diff --git a/src/tracking.c b/src/tracking.c
index bbfc66a72..f3ff7ed03 100644
--- a/src/tracking.c
+++ b/src/tracking.c
@@ -117,6 +117,40 @@ void trackingRememberKeys(client *c) {
getKeysFreeResult(keys);
}
+void sendTrackingMessage(client *c, long long hash) {
+ int using_redirection = 0;
+ if (c->client_tracking_redirection) {
+ client *redir = lookupClientByID(c->client_tracking_redirection);
+ if (!redir) {
+ /* We need to signal to the original connection that we
+ * are unable to send invalidation messages to the redirected
+ * connection, because the client no longer exist. */
+ if (c->resp > 2) {
+ addReplyPushLen(c,3);
+ addReplyBulkCBuffer(c,"tracking-redir-broken",21);
+ addReplyLongLong(c,c->client_tracking_redirection);
+ }
+ return;
+ }
+ c = redir;
+ using_redirection = 1;
+ }
+
+ /* Only send such info for clients in RESP version 3 or more. However
+ * if redirection is active, and the connection we redirect to is
+ * in Pub/Sub mode, we can support the feature with RESP 2 as well,
+ * by sending Pub/Sub messages in the __redis__:invalidate channel. */
+ if (c->resp > 2) {
+ addReplyPushLen(c,2);
+ addReplyBulkCBuffer(c,"invalidate",10);
+ addReplyLongLong(c,hash);
+ } else if (using_redirection && c->flags & CLIENT_PUBSUB) {
+ robj *msg = createStringObjectFromLongLong(hash);
+ addReplyPubsubMessage(c,TrackingChannelName,msg);
+ decrRefCount(msg);
+ }
+}
+
/* This function is called from signalModifiedKey() or other places in Redis
* when a key changes value. In the context of keys tracking, our task here is
* to send a notification to every client that may have keys about such . */
@@ -134,37 +168,7 @@ void trackingInvalidateKey(robj *keyobj) {
memcpy(&id,ri.key,ri.key_len);
client *c = lookupClientByID(id);
if (c == NULL) continue;
- int using_redirection = 0;
- if (c->client_tracking_redirection) {
- client *redir = lookupClientByID(c->client_tracking_redirection);
- if (!redir) {
- /* We need to signal to the original connection that we
- * are unable to send invalidation messages to the redirected
- * connection, because the client no longer exist. */
- if (c->resp > 2) {
- addReplyPushLen(c,3);
- addReplyBulkCBuffer(c,"tracking-redir-broken",21);
- addReplyLongLong(c,c->client_tracking_redirection);
- }
- continue;
- }
- c = redir;
- using_redirection = 1;
- }
-
- /* Only send such info for clients in RESP version 3 or more. However
- * if redirection is active, and the connection we redirect to is
- * in Pub/Sub mode, we can support the feature with RESP 2 as well,
- * by sending Pub/Sub messages in the __redis__:invalidate channel. */
- if (c->resp > 2) {
- addReplyPushLen(c,2);
- addReplyBulkCBuffer(c,"invalidate",10);
- addReplyLongLong(c,hash);
- } else if (using_redirection && c->flags & CLIENT_PUBSUB) {
- robj *msg = createStringObjectFromLongLong(hash);
- addReplyPubsubMessage(c,TrackingChannelName,msg);
- decrRefCount(msg);
- }
+ sendTrackingMessage(c,hash);
}
raxStop(&ri);
@@ -173,3 +177,18 @@ void trackingInvalidateKey(robj *keyobj) {
raxFree(TrackingTable[hash]);
TrackingTable[hash] = NULL;
}
+
+void trackingInvalidateKeysOnFlush(int dbid) {
+ UNUSED(dbid);
+ if (server.tracking_clients == 0) return;
+
+ listNode *ln;
+ listIter li;
+ listRewind(server.clients,&li);
+ while ((ln = listNext(&li)) != NULL) {
+ client *c = listNodeValue(ln);
+ if (c->flags & CLIENT_TRACKING) {
+ sendTrackingMessage(c,-1);
+ }
+ }
+}