summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorantirez <antirez@gmail.com>2020-04-21 10:51:46 +0200
committerantirez <antirez@gmail.com>2020-04-21 10:51:46 +0200
commit94f2e7f9f9f7e6eca8f8bd7ae412c34806e68351 (patch)
tree2927805c22e495387d171df16cd530147245a02b
parentc7db333abb0e45ff8974ef2d1fc4f9ae1e7be1e2 (diff)
downloadredis-94f2e7f9f9f7e6eca8f8bd7ae412c34806e68351.tar.gz
Tracking: NOLOOP internals implementation.
-rw-r--r--src/bitops.c8
-rw-r--r--src/cluster.c4
-rw-r--r--src/db.c30
-rw-r--r--src/debug.c2
-rw-r--r--src/expire.c6
-rw-r--r--src/geo.c4
-rw-r--r--src/hyperloglog.c6
-rw-r--r--src/module.c10
-rw-r--r--src/server.h12
-rw-r--r--src/sort.c4
-rw-r--r--src/t_hash.c10
-rw-r--r--src/t_list.c20
-rw-r--r--src/t_set.c20
-rw-r--r--src/t_stream.c6
-rw-r--r--src/t_string.c14
-rw-r--r--src/t_zset.c12
-rw-r--r--src/tracking.c118
17 files changed, 174 insertions, 112 deletions
diff --git a/src/bitops.c b/src/bitops.c
index 496d78b66..f506a881b 100644
--- a/src/bitops.c
+++ b/src/bitops.c
@@ -554,7 +554,7 @@ void setbitCommand(client *c) {
byteval &= ~(1 << bit);
byteval |= ((on & 0x1) << bit);
((uint8_t*)o->ptr)[byte] = byteval;
- signalModifiedKey(c->db,c->argv[1]);
+ signalModifiedKey(c,c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_STRING,"setbit",c->argv[1],c->db->id);
server.dirty++;
addReply(c, bitval ? shared.cone : shared.czero);
@@ -754,11 +754,11 @@ void bitopCommand(client *c) {
/* Store the computed value into the target key */
if (maxlen) {
o = createObject(OBJ_STRING,res);
- setKey(c->db,targetkey,o);
+ setKey(c,c->db,targetkey,o);
notifyKeyspaceEvent(NOTIFY_STRING,"set",targetkey,c->db->id);
decrRefCount(o);
} else if (dbDelete(c->db,targetkey)) {
- signalModifiedKey(c->db,targetkey);
+ signalModifiedKey(c,c->db,targetkey);
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",targetkey,c->db->id);
}
server.dirty++;
@@ -1135,7 +1135,7 @@ void bitfieldGeneric(client *c, int flags) {
}
if (changes) {
- signalModifiedKey(c->db,c->argv[1]);
+ signalModifiedKey(c,c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_STRING,"setbit",c->argv[1],c->db->id);
server.dirty += changes;
}
diff --git a/src/cluster.c b/src/cluster.c
index 2377b386b..b103e2fe1 100644
--- a/src/cluster.c
+++ b/src/cluster.c
@@ -4982,7 +4982,7 @@ void restoreCommand(client *c) {
setExpire(c,c->db,c->argv[1],ttl);
}
objectSetLRUOrLFU(obj,lfu_freq,lru_idle,lru_clock,1000);
- signalModifiedKey(c->db,c->argv[1]);
+ signalModifiedKey(c,c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_GENERIC,"restore",c->argv[1],c->db->id);
addReply(c,shared.ok);
server.dirty++;
@@ -5329,7 +5329,7 @@ try_again:
if (!copy) {
/* No COPY option: remove the local key, signal the change. */
dbDelete(c->db,kv[j]);
- signalModifiedKey(c->db,kv[j]);
+ signalModifiedKey(c,c->db,kv[j]);
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",kv[j],c->db->id);
server.dirty++;
diff --git a/src/db.c b/src/db.c
index 59f0cc7a0..dc4a0b63e 100644
--- a/src/db.c
+++ b/src/db.c
@@ -238,8 +238,10 @@ void dbOverwrite(redisDb *db, robj *key, robj *val) {
* 3) The expire time of the key is reset (the key is made persistent),
* unless 'keepttl' is true.
*
- * All the new keys in the database should be created via this interface. */
-void genericSetKey(redisDb *db, robj *key, robj *val, int keepttl, int signal) {
+ * All the new keys in the database should be created via this interface.
+ * The client 'c' argument may be set to NULL if the operation is performed
+ * in a context where there is no clear client performing the operation. */
+void genericSetKey(client *c, redisDb *db, robj *key, robj *val, int keepttl, int signal) {
if (lookupKeyWrite(db,key) == NULL) {
dbAdd(db,key,val);
} else {
@@ -247,12 +249,12 @@ void genericSetKey(redisDb *db, robj *key, robj *val, int keepttl, int signal) {
}
incrRefCount(val);
if (!keepttl) removeExpire(db,key);
- if (signal) signalModifiedKey(db,key);
+ if (signal) signalModifiedKey(c,db,key);
}
/* Common case for genericSetKey() where the TTL is not retained. */
-void setKey(redisDb *db, robj *key, robj *val) {
- genericSetKey(db,key,val,0,1);
+void setKey(client *c, redisDb *db, robj *key, robj *val) {
+ genericSetKey(c,db,key,val,0,1);
}
/* Return true if the specified key exists in the specified database.
@@ -467,9 +469,11 @@ long long dbTotalServerKeyCount() {
* Every time a DB is flushed the function signalFlushDb() is called.
*----------------------------------------------------------------------------*/
-void signalModifiedKey(redisDb *db, robj *key) {
+/* Note that the 'c' argument may be NULL if the key was modified out of
+ * a context of a client. */
+void signalModifiedKey(client *c, redisDb *db, robj *key) {
touchWatchedKey(db,key);
- trackingInvalidateKey(key);
+ trackingInvalidateKey(c,key);
}
void signalFlushedDb(int dbid) {
@@ -563,7 +567,7 @@ void delGenericCommand(client *c, int lazy) {
int deleted = lazy ? dbAsyncDelete(c->db,c->argv[j]) :
dbSyncDelete(c->db,c->argv[j]);
if (deleted) {
- signalModifiedKey(c->db,c->argv[j]);
+ signalModifiedKey(c,c->db,c->argv[j]);
notifyKeyspaceEvent(NOTIFY_GENERIC,
"del",c->argv[j],c->db->id);
server.dirty++;
@@ -1003,8 +1007,8 @@ void renameGenericCommand(client *c, int nx) {
dbAdd(c->db,c->argv[2],o);
if (expire != -1) setExpire(c,c->db,c->argv[2],expire);
dbDelete(c->db,c->argv[1]);
- signalModifiedKey(c->db,c->argv[1]);
- signalModifiedKey(c->db,c->argv[2]);
+ signalModifiedKey(c,c->db,c->argv[1]);
+ signalModifiedKey(c,c->db,c->argv[2]);
notifyKeyspaceEvent(NOTIFY_GENERIC,"rename_from",
c->argv[1],c->db->id);
notifyKeyspaceEvent(NOTIFY_GENERIC,"rename_to",
@@ -1072,8 +1076,8 @@ void moveCommand(client *c) {
/* OK! key moved, free the entry in the source DB */
dbDelete(src,c->argv[1]);
- signalModifiedKey(src,c->argv[1]);
- signalModifiedKey(dst,c->argv[1]);
+ signalModifiedKey(c,src,c->argv[1]);
+ signalModifiedKey(c,dst,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_GENERIC,
"move_from",c->argv[1],src->id);
notifyKeyspaceEvent(NOTIFY_GENERIC,
@@ -1317,7 +1321,7 @@ int expireIfNeeded(redisDb *db, robj *key) {
"expired",key,db->id);
int retval = server.lazyfree_lazy_expire ? dbAsyncDelete(db,key) :
dbSyncDelete(db,key);
- if (retval) signalModifiedKey(db,key);
+ if (retval) signalModifiedKey(NULL,db,key);
return retval;
}
diff --git a/src/debug.c b/src/debug.c
index 1351b2536..cbb56cb71 100644
--- a/src/debug.c
+++ b/src/debug.c
@@ -588,7 +588,7 @@ NULL
memcpy(val->ptr, buf, valsize<=buflen? valsize: buflen);
}
dbAdd(c->db,key,val);
- signalModifiedKey(c->db,key);
+ signalModifiedKey(c,c->db,key);
decrRefCount(key);
}
addReply(c,shared.ok);
diff --git a/src/expire.c b/src/expire.c
index c102a01ff..30a27193d 100644
--- a/src/expire.c
+++ b/src/expire.c
@@ -64,7 +64,7 @@ int activeExpireCycleTryExpire(redisDb *db, dictEntry *de, long long now) {
dbSyncDelete(db,keyobj);
notifyKeyspaceEvent(NOTIFY_EXPIRED,
"expired",keyobj,db->id);
- trackingInvalidateKey(keyobj);
+ trackingInvalidateKey(NULL,keyobj);
decrRefCount(keyobj);
server.stat_expiredkeys++;
return 1;
@@ -519,14 +519,14 @@ void expireGenericCommand(client *c, long long basetime, int unit) {
/* Replicate/AOF this as an explicit DEL or UNLINK. */
aux = server.lazyfree_lazy_expire ? shared.unlink : shared.del;
rewriteClientCommandVector(c,2,aux,key);
- signalModifiedKey(c->db,key);
+ signalModifiedKey(c,c->db,key);
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key,c->db->id);
addReply(c, shared.cone);
return;
} else {
setExpire(c,c->db,key,when);
addReply(c,shared.cone);
- signalModifiedKey(c->db,key);
+ signalModifiedKey(c,c->db,key);
notifyKeyspaceEvent(NOTIFY_GENERIC,"expire",key,c->db->id);
server.dirty++;
return;
diff --git a/src/geo.c b/src/geo.c
index f7920a2e2..3e5d5f606 100644
--- a/src/geo.c
+++ b/src/geo.c
@@ -657,13 +657,13 @@ void georadiusGeneric(client *c, int flags) {
if (returned_items) {
zsetConvertToZiplistIfNeeded(zobj,maxelelen);
- setKey(c->db,storekey,zobj);
+ setKey(c,c->db,storekey,zobj);
decrRefCount(zobj);
notifyKeyspaceEvent(NOTIFY_ZSET,"georadiusstore",storekey,
c->db->id);
server.dirty += returned_items;
} else if (dbDelete(c->db,storekey)) {
- signalModifiedKey(c->db,storekey);
+ signalModifiedKey(c,c->db,storekey);
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",storekey,c->db->id);
server.dirty++;
}
diff --git a/src/hyperloglog.c b/src/hyperloglog.c
index facd99743..721f492a1 100644
--- a/src/hyperloglog.c
+++ b/src/hyperloglog.c
@@ -1209,7 +1209,7 @@ void pfaddCommand(client *c) {
}
hdr = o->ptr;
if (updated) {
- signalModifiedKey(c->db,c->argv[1]);
+ signalModifiedKey(c,c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_STRING,"pfadd",c->argv[1],c->db->id);
server.dirty++;
HLL_INVALIDATE_CACHE(hdr);
@@ -1300,7 +1300,7 @@ void pfcountCommand(client *c) {
* data structure is not modified, since the cached value
* may be modified and given that the HLL is a Redis string
* we need to propagate the change. */
- signalModifiedKey(c->db,c->argv[1]);
+ signalModifiedKey(c,c->db,c->argv[1]);
server.dirty++;
}
addReplyLongLong(c,card);
@@ -1373,7 +1373,7 @@ void pfmergeCommand(client *c) {
last hllSparseSet() call. */
HLL_INVALIDATE_CACHE(hdr);
- signalModifiedKey(c->db,c->argv[1]);
+ signalModifiedKey(c,c->db,c->argv[1]);
/* We generate a PFADD event for PFMERGE for semantical simplicity
* since in theory this is a mass-add of elements. */
notifyKeyspaceEvent(NOTIFY_STRING,"pfadd",c->argv[1],c->db->id);
diff --git a/src/module.c b/src/module.c
index 4d3d9e1af..e3a338dad 100644
--- a/src/module.c
+++ b/src/module.c
@@ -896,7 +896,7 @@ void RM_SetModuleOptions(RedisModuleCtx *ctx, int options) {
/* Signals that the key is modified from user's perspective (i.e. invalidate WATCH
* and client side caching). */
int RM_SignalModifiedKey(RedisModuleCtx *ctx, RedisModuleString *keyname) {
- signalModifiedKey(ctx->client->db,keyname);
+ signalModifiedKey(ctx->client,ctx->client->db,keyname);
return REDISMODULE_OK;
}
@@ -2016,7 +2016,7 @@ void *RM_OpenKey(RedisModuleCtx *ctx, robj *keyname, int mode) {
static void moduleCloseKey(RedisModuleKey *key) {
int signal = SHOULD_SIGNAL_MODIFIED_KEYS(key->ctx);
if ((key->mode & REDISMODULE_WRITE) && signal)
- signalModifiedKey(key->db,key->key);
+ signalModifiedKey(key->ctx->client,key->db,key->key);
/* TODO: if (key->iter) RM_KeyIteratorStop(kp); */
RM_ZsetRangeStop(key);
decrRefCount(key->key);
@@ -2157,7 +2157,7 @@ RedisModuleString *RM_RandomKey(RedisModuleCtx *ctx) {
int RM_StringSet(RedisModuleKey *key, RedisModuleString *str) {
if (!(key->mode & REDISMODULE_WRITE) || key->iter) return REDISMODULE_ERR;
RM_DeleteKey(key);
- genericSetKey(key->db,key->key,str,0,0);
+ genericSetKey(key->ctx->client,key->db,key->key,str,0,0);
key->value = str;
return REDISMODULE_OK;
}
@@ -2237,7 +2237,7 @@ int RM_StringTruncate(RedisModuleKey *key, size_t newlen) {
if (key->value == NULL) {
/* Empty key: create it with the new size. */
robj *o = createObject(OBJ_STRING,sdsnewlen(NULL, newlen));
- genericSetKey(key->db,key->key,o,0,0);
+ genericSetKey(key->ctx->client,key->db,key->key,o,0,0);
key->value = o;
decrRefCount(o);
} else {
@@ -3625,7 +3625,7 @@ int RM_ModuleTypeSetValue(RedisModuleKey *key, moduleType *mt, void *value) {
if (!(key->mode & REDISMODULE_WRITE) || key->iter) return REDISMODULE_ERR;
RM_DeleteKey(key);
robj *o = createModuleObject(mt,value);
- genericSetKey(key->db,key->key,o,0,0);
+ genericSetKey(key->ctx->client,key->db,key->key,o,0,0);
decrRefCount(o);
key->value = o;
return REDISMODULE_OK;
diff --git a/src/server.h b/src/server.h
index d0d5ff154..d39359dce 100644
--- a/src/server.h
+++ b/src/server.h
@@ -252,7 +252,9 @@ typedef long long ustime_t; /* microsecond time type. */
#define CLIENT_TRACKING_OPTOUT (1ULL<<35) /* Tracking in opt-out mode. */
#define CLIENT_TRACKING_CACHING (1ULL<<36) /* CACHING yes/no was given,
depending on optin/optout mode. */
-#define CLIENT_IN_TO_TABLE (1ULL<<37) /* This client is in the timeout table. */
+#define CLIENT_TRACKING_NOLOOP (1ULL<<37) /* Don't send invalidation messages
+ about writes performed by myself.*/
+#define CLIENT_IN_TO_TABLE (1ULL<<38) /* This client is in the timeout table. */
/* Client block type (btype field in client structure)
* if CLIENT_BLOCKED flag is set. */
@@ -1683,7 +1685,7 @@ void addReplyStatusFormat(client *c, const char *fmt, ...);
void enableTracking(client *c, uint64_t redirect_to, uint64_t options, robj **prefix, size_t numprefix);
void disableTracking(client *c);
void trackingRememberKeys(client *c);
-void trackingInvalidateKey(robj *keyobj);
+void trackingInvalidateKey(client *c, robj *keyobj);
void trackingInvalidateKeysOnFlush(int dbid);
void trackingLimitUsedSlots(void);
uint64_t trackingGetTotalItems(void);
@@ -2071,8 +2073,8 @@ int objectSetLRUOrLFU(robj *val, long long lfu_freq, long long lru_idle,
void dbAdd(redisDb *db, robj *key, robj *val);
int dbAddRDBLoad(redisDb *db, sds key, robj *val);
void dbOverwrite(redisDb *db, robj *key, robj *val);
-void genericSetKey(redisDb *db, robj *key, robj *val, int keepttl, int signal);
-void setKey(redisDb *db, robj *key, robj *val);
+void genericSetKey(client *c, redisDb *db, robj *key, robj *val, int keepttl, int signal);
+void setKey(client *c, redisDb *db, robj *key, robj *val);
int dbExists(redisDb *db, robj *key);
robj *dbRandomKey(redisDb *db);
int dbSyncDelete(redisDb *db, robj *key);
@@ -2088,7 +2090,7 @@ void flushAllDataAndResetRDB(int flags);
long long dbTotalServerKeyCount();
int selectDb(client *c, int id);
-void signalModifiedKey(redisDb *db, robj *key);
+void signalModifiedKey(client *c, redisDb *db, robj *key);
void signalFlushedDb(int dbid);
unsigned int getKeysInSlot(unsigned int hashslot, robj **keys, unsigned int count);
unsigned int countKeysInSlot(unsigned int hashslot);
diff --git a/src/sort.c b/src/sort.c
index db26da158..f269a7731 100644
--- a/src/sort.c
+++ b/src/sort.c
@@ -570,12 +570,12 @@ void sortCommand(client *c) {
}
}
if (outputlen) {
- setKey(c->db,storekey,sobj);
+ setKey(c,c->db,storekey,sobj);
notifyKeyspaceEvent(NOTIFY_LIST,"sortstore",storekey,
c->db->id);
server.dirty += outputlen;
} else if (dbDelete(c->db,storekey)) {
- signalModifiedKey(c->db,storekey);
+ signalModifiedKey(c,c->db,storekey);
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",storekey,c->db->id);
server.dirty++;
}
diff --git a/src/t_hash.c b/src/t_hash.c
index b9f0db7fc..866bcd25b 100644
--- a/src/t_hash.c
+++ b/src/t_hash.c
@@ -521,7 +521,7 @@ void hsetnxCommand(client *c) {
} else {
hashTypeSet(o,c->argv[2]->ptr,c->argv[3]->ptr,HASH_SET_COPY);
addReply(c, shared.cone);
- signalModifiedKey(c->db,c->argv[1]);
+ signalModifiedKey(c,c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_HASH,"hset",c->argv[1],c->db->id);
server.dirty++;
}
@@ -551,7 +551,7 @@ void hsetCommand(client *c) {
/* HMSET */
addReply(c, shared.ok);
}
- signalModifiedKey(c->db,c->argv[1]);
+ signalModifiedKey(c,c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_HASH,"hset",c->argv[1],c->db->id);
server.dirty++;
}
@@ -586,7 +586,7 @@ void hincrbyCommand(client *c) {
new = sdsfromlonglong(value);
hashTypeSet(o,c->argv[2]->ptr,new,HASH_SET_TAKE_VALUE);
addReplyLongLong(c,value);
- signalModifiedKey(c->db,c->argv[1]);
+ signalModifiedKey(c,c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_HASH,"hincrby",c->argv[1],c->db->id);
server.dirty++;
}
@@ -625,7 +625,7 @@ void hincrbyfloatCommand(client *c) {
new = sdsnewlen(buf,len);
hashTypeSet(o,c->argv[2]->ptr,new,HASH_SET_TAKE_VALUE);
addReplyBulkCBuffer(c,buf,len);
- signalModifiedKey(c->db,c->argv[1]);
+ signalModifiedKey(c,c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_HASH,"hincrbyfloat",c->argv[1],c->db->id);
server.dirty++;
@@ -721,7 +721,7 @@ void hdelCommand(client *c) {
}
}
if (deleted) {
- signalModifiedKey(c->db,c->argv[1]);
+ signalModifiedKey(c,c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_HASH,"hdel",c->argv[1],c->db->id);
if (keyremoved)
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",c->argv[1],
diff --git a/src/t_list.c b/src/t_list.c
index eaeaa8e48..4770a2272 100644
--- a/src/t_list.c
+++ b/src/t_list.c
@@ -217,7 +217,7 @@ void pushGenericCommand(client *c, int where) {
if (pushed) {
char *event = (where == LIST_HEAD) ? "lpush" : "rpush";
- signalModifiedKey(c->db,c->argv[1]);
+ signalModifiedKey(c,c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_LIST,event,c->argv[1],c->db->id);
}
server.dirty += pushed;
@@ -247,7 +247,7 @@ void pushxGenericCommand(client *c, int where) {
if (pushed) {
char *event = (where == LIST_HEAD) ? "lpush" : "rpush";
- signalModifiedKey(c->db,c->argv[1]);
+ signalModifiedKey(c,c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_LIST,event,c->argv[1],c->db->id);
}
server.dirty += pushed;
@@ -292,7 +292,7 @@ void linsertCommand(client *c) {
listTypeReleaseIterator(iter);
if (inserted) {
- signalModifiedKey(c->db,c->argv[1]);
+ signalModifiedKey(c,c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_LIST,"linsert",
c->argv[1],c->db->id);
server.dirty++;
@@ -355,7 +355,7 @@ void lsetCommand(client *c) {
addReply(c,shared.outofrangeerr);
} else {
addReply(c,shared.ok);
- signalModifiedKey(c->db,c->argv[1]);
+ signalModifiedKey(c,c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_LIST,"lset",c->argv[1],c->db->id);
server.dirty++;
}
@@ -382,7 +382,7 @@ void popGenericCommand(client *c, int where) {
c->argv[1],c->db->id);
dbDelete(c->db,c->argv[1]);
}
- signalModifiedKey(c->db,c->argv[1]);
+ signalModifiedKey(c,c->db,c->argv[1]);
server.dirty++;
}
}
@@ -482,7 +482,7 @@ void ltrimCommand(client *c) {
dbDelete(c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",c->argv[1],c->db->id);
}
- signalModifiedKey(c->db,c->argv[1]);
+ signalModifiedKey(c,c->db,c->argv[1]);
server.dirty++;
addReply(c,shared.ok);
}
@@ -519,7 +519,7 @@ void lremCommand(client *c) {
listTypeReleaseIterator(li);
if (removed) {
- signalModifiedKey(c->db,c->argv[1]);
+ signalModifiedKey(c,c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_LIST,"lrem",c->argv[1],c->db->id);
}
@@ -555,7 +555,7 @@ void rpoplpushHandlePush(client *c, robj *dstkey, robj *dstobj, robj *value) {
server.list_compress_depth);
dbAdd(c->db,dstkey,dstobj);
}
- signalModifiedKey(c->db,dstkey);
+ signalModifiedKey(c,c->db,dstkey);
listTypePush(dstobj,value,LIST_HEAD);
notifyKeyspaceEvent(NOTIFY_LIST,"lpush",dstkey,c->db->id);
/* Always send the pushed value to the client. */
@@ -593,7 +593,7 @@ void rpoplpushCommand(client *c) {
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",
touchedkey,c->db->id);
}
- signalModifiedKey(c->db,touchedkey);
+ signalModifiedKey(c,c->db,touchedkey);
decrRefCount(touchedkey);
server.dirty++;
if (c->cmd->proc == brpoplpushCommand) {
@@ -708,7 +708,7 @@ void blockingPopGenericCommand(client *c, int where) {
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",
c->argv[j],c->db->id);
}
- signalModifiedKey(c->db,c->argv[j]);
+ signalModifiedKey(c,c->db,c->argv[j]);
server.dirty++;
/* Replicate it as an [LR]POP instead of B[LR]POP. */
diff --git a/src/t_set.c b/src/t_set.c
index 60cf22d8c..c2e73a6e6 100644
--- a/src/t_set.c
+++ b/src/t_set.c
@@ -280,7 +280,7 @@ void saddCommand(client *c) {
if (setTypeAdd(set,c->argv[j]->ptr)) added++;
}
if (added) {
- signalModifiedKey(c->db,c->argv[1]);
+ signalModifiedKey(c,c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_SET,"sadd",c->argv[1],c->db->id);
}
server.dirty += added;
@@ -305,7 +305,7 @@ void sremCommand(client *c) {
}
}
if (deleted) {
- signalModifiedKey(c->db,c->argv[1]);
+ signalModifiedKey(c,c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_SET,"srem",c->argv[1],c->db->id);
if (keyremoved)
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",c->argv[1],
@@ -358,8 +358,8 @@ void smoveCommand(client *c) {
dbAdd(c->db,c->argv[2],dstset);
}
- signalModifiedKey(c->db,c->argv[1]);
- signalModifiedKey(c->db,c->argv[2]);
+ signalModifiedKey(c,c->db,c->argv[1]);
+ signalModifiedKey(c,c->db,c->argv[2]);
server.dirty++;
/* An extra key has changed when ele was successfully added to dstset */
@@ -444,7 +444,7 @@ void spopWithCountCommand(client *c) {
/* Propagate this command as an DEL operation */
rewriteClientCommandVector(c,2,shared.del,c->argv[1]);
- signalModifiedKey(c->db,c->argv[1]);
+ signalModifiedKey(c,c->db,c->argv[1]);
server.dirty++;
return;
}
@@ -546,7 +546,7 @@ void spopWithCountCommand(client *c) {
* the alsoPropagate() API. */
decrRefCount(propargv[0]);
preventCommandPropagation(c);
- signalModifiedKey(c->db,c->argv[1]);
+ signalModifiedKey(c,c->db,c->argv[1]);
server.dirty++;
}
@@ -599,7 +599,7 @@ void spopCommand(client *c) {
}
/* Set has been modified */
- signalModifiedKey(c->db,c->argv[1]);
+ signalModifiedKey(c,c->db,c->argv[1]);
server.dirty++;
}
@@ -808,7 +808,7 @@ void sinterGenericCommand(client *c, robj **setkeys,
zfree(sets);
if (dstkey) {
if (dbDelete(c->db,dstkey)) {
- signalModifiedKey(c->db,dstkey);
+ signalModifiedKey(c,c->db,dstkey);
server.dirty++;
}
addReply(c,shared.czero);
@@ -908,7 +908,7 @@ void sinterGenericCommand(client *c, robj **setkeys,
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",
dstkey,c->db->id);
}
- signalModifiedKey(c->db,dstkey);
+ signalModifiedKey(c,c->db,dstkey);
server.dirty++;
} else {
setDeferredSetLen(c,replylen,cardinality);
@@ -1083,7 +1083,7 @@ void sunionDiffGenericCommand(client *c, robj **setkeys, int setnum,
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",
dstkey,c->db->id);
}
- signalModifiedKey(c->db,dstkey);
+ signalModifiedKey(c,c->db,dstkey);
server.dirty++;
}
zfree(sets);
diff --git a/src/t_stream.c b/src/t_stream.c
index 155167af9..3efaa4509 100644
--- a/src/t_stream.c
+++ b/src/t_stream.c
@@ -1262,7 +1262,7 @@ void xaddCommand(client *c) {
}
addReplyStreamID(c,&id);
- signalModifiedKey(c->db,c->argv[1]);
+ signalModifiedKey(c,c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_STREAM,"xadd",c->argv[1],c->db->id);
server.dirty++;
@@ -2390,7 +2390,7 @@ void xdelCommand(client *c) {
/* Propagate the write if needed. */
if (deleted) {
- signalModifiedKey(c->db,c->argv[1]);
+ signalModifiedKey(c,c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_STREAM,"xdel",c->argv[1],c->db->id);
server.dirty += deleted;
}
@@ -2467,7 +2467,7 @@ void xtrimCommand(client *c) {
/* Propagate the write if needed. */
if (deleted) {
- signalModifiedKey(c->db,c->argv[1]);
+ signalModifiedKey(c,c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_STREAM,"xtrim",c->argv[1],c->db->id);
server.dirty += deleted;
if (approx_maxlen) streamRewriteApproxMaxlen(c,s,maxlen_arg_idx);
diff --git a/src/t_string.c b/src/t_string.c
index 335bda404..ef382bb0c 100644
--- a/src/t_string.c
+++ b/src/t_string.c
@@ -84,7 +84,7 @@ void setGenericCommand(client *c, int flags, robj *key, robj *val, robj *expire,
addReply(c, abort_reply ? abort_reply : shared.null[c->resp]);
return;
}
- genericSetKey(c->db,key,val,flags & OBJ_SET_KEEPTTL,1);
+ genericSetKey(c,c->db,key,val,flags & OBJ_SET_KEEPTTL,1);
server.dirty++;
if (expire) setExpire(c,c->db,key,mstime()+milliseconds);
notifyKeyspaceEvent(NOTIFY_STRING,"set",key,c->db->id);
@@ -183,7 +183,7 @@ void getCommand(client *c) {
void getsetCommand(client *c) {
if (getGenericCommand(c) == C_ERR) return;
c->argv[2] = tryObjectEncoding(c->argv[2]);
- setKey(c->db,c->argv[1],c->argv[2]);
+ setKey(c,c->db,c->argv[1],c->argv[2]);
notifyKeyspaceEvent(NOTIFY_STRING,"set",c->argv[1],c->db->id);
server.dirty++;
}
@@ -240,7 +240,7 @@ void setrangeCommand(client *c) {
if (sdslen(value) > 0) {
o->ptr = sdsgrowzero(o->ptr,offset+sdslen(value));
memcpy((char*)o->ptr+offset,value,sdslen(value));
- signalModifiedKey(c->db,c->argv[1]);
+ signalModifiedKey(c,c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_STRING,
"setrange",c->argv[1],c->db->id);
server.dirty++;
@@ -328,7 +328,7 @@ void msetGenericCommand(client *c, int nx) {
for (j = 1; j < c->argc; j += 2) {
c->argv[j+1] = tryObjectEncoding(c->argv[j+1]);
- setKey(c->db,c->argv[j],c->argv[j+1]);
+ setKey(c,c->db,c->argv[j],c->argv[j+1]);
notifyKeyspaceEvent(NOTIFY_STRING,"set",c->argv[j],c->db->id);
}
server.dirty += (c->argc-1)/2;
@@ -373,7 +373,7 @@ void incrDecrCommand(client *c, long long incr) {
dbAdd(c->db,c->argv[1],new);
}
}
- signalModifiedKey(c->db,c->argv[1]);
+ signalModifiedKey(c,c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_STRING,"incrby",c->argv[1],c->db->id);
server.dirty++;
addReply(c,shared.colon);
@@ -423,7 +423,7 @@ void incrbyfloatCommand(client *c) {
dbOverwrite(c->db,c->argv[1],new);
else
dbAdd(c->db,c->argv[1],new);
- signalModifiedKey(c->db,c->argv[1]);
+ signalModifiedKey(c,c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_STRING,"incrbyfloat",c->argv[1],c->db->id);
server.dirty++;
addReplyBulk(c,new);
@@ -467,7 +467,7 @@ void appendCommand(client *c) {
o->ptr = sdscatlen(o->ptr,append->ptr,sdslen(append->ptr));
totlen = sdslen(o->ptr);
}
- signalModifiedKey(c->db,c->argv[1]);
+ signalModifiedKey(c,c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_STRING,"append",c->argv[1],c->db->id);
server.dirty++;
addReplyLongLong(c,totlen);
diff --git a/src/t_zset.c b/src/t_zset.c
index 5c000e76f..9c409cd96 100644
--- a/src/t_zset.c
+++ b/src/t_zset.c
@@ -1646,7 +1646,7 @@ reply_to_client:
cleanup:
zfree(scores);
if (added || updated) {
- signalModifiedKey(c->db,key);
+ signalModifiedKey(c,c->db,key);
notifyKeyspaceEvent(NOTIFY_ZSET,
incr ? "zincr" : "zadd", key, c->db->id);
}
@@ -1681,7 +1681,7 @@ void zremCommand(client *c) {
notifyKeyspaceEvent(NOTIFY_ZSET,"zrem",key,c->db->id);
if (keyremoved)
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key,c->db->id);
- signalModifiedKey(c->db,key);
+ signalModifiedKey(c,c->db,key);
server.dirty += deleted;
}
addReplyLongLong(c,deleted);
@@ -1779,7 +1779,7 @@ void zremrangeGenericCommand(client *c, int rangetype) {
/* Step 4: Notifications and reply. */
if (deleted) {
char *event[3] = {"zremrangebyrank","zremrangebyscore","zremrangebylex"};
- signalModifiedKey(c->db,key);
+ signalModifiedKey(c,c->db,key);
notifyKeyspaceEvent(NOTIFY_ZSET,event[rangetype],key,c->db->id);
if (keyremoved)
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key,c->db->id);
@@ -2383,7 +2383,7 @@ void zunionInterGenericCommand(client *c, robj *dstkey, int op) {
zsetConvertToZiplistIfNeeded(dstobj,maxelelen);
dbAdd(c->db,dstkey,dstobj);
addReplyLongLong(c,zsetLength(dstobj));
- signalModifiedKey(c->db,dstkey);
+ signalModifiedKey(c,c->db,dstkey);
notifyKeyspaceEvent(NOTIFY_ZSET,
(op == SET_OP_UNION) ? "zunionstore" : "zinterstore",
dstkey,c->db->id);
@@ -2392,7 +2392,7 @@ void zunionInterGenericCommand(client *c, robj *dstkey, int op) {
decrRefCount(dstobj);
addReply(c,shared.czero);
if (touched) {
- signalModifiedKey(c->db,dstkey);
+ signalModifiedKey(c,c->db,dstkey);
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",dstkey,c->db->id);
server.dirty++;
}
@@ -3216,7 +3216,7 @@ void genericZpopCommand(client *c, robj **keyv, int keyc, int where, int emitkey
if (arraylen == 0) { /* Do this only for the first iteration. */
char *events[2] = {"zpopmin","zpopmax"};
notifyKeyspaceEvent(NOTIFY_ZSET,events[where],key,c->db->id);
- signalModifiedKey(c->db,key);
+ signalModifiedKey(c,c->db,key);
}
addReplyBulkCBuffer(c,ele,sdslen(ele));
diff --git a/src/tracking.c b/src/tracking.c
index 6f7929430..434e086b5 100644
--- a/src/tracking.c
+++ b/src/tracking.c
@@ -245,7 +245,7 @@ void sendTrackingMessage(client *c, char *keyname, size_t keylen, int proto) {
* matches one or more prefixes in the prefix table. Later when we
* return to the event loop, we'll send invalidation messages to the
* clients subscribed to each prefix. */
-void trackingRememberKeyToBroadcast(char *keyname, size_t keylen) {
+void trackingRememberKeyToBroadcast(client *c, char *keyname, size_t keylen) {
raxIterator ri;
raxStart(&ri,PrefixTable);
raxSeek(&ri,"^",NULL,0);
@@ -254,7 +254,11 @@ void trackingRememberKeyToBroadcast(char *keyname, size_t keylen) {
if (ri.key_len != 0 && memcmp(ri.key,keyname,ri.key_len) != 0)
continue;
bcastState *bs = ri.data;
- raxTryInsert(bs->keys,(unsigned char*)keyname,keylen,NULL,NULL);
+ /* We insert the client pointer as associated value in the radix
+ * tree. This way we know who was the client that did the last
+ * change to the key, and can avoid sending the notification in the
+ * case the client is in NOLOOP mode. */
+ raxTryInsert(bs->keys,(unsigned char*)keyname,keylen,c,NULL);
}
raxStop(&ri);
}
@@ -262,13 +266,17 @@ void trackingRememberKeyToBroadcast(char *keyname, size_t keylen) {
/* This function is called from signalModifiedKey() or other places in Redis
* when a key changes value. In the context of keys tracking, our task here is
* to send a notification to every client that may have keys about such caching
- * slot. */
-void trackingInvalidateKey(robj *keyobj) {
+ * slot.
+ *
+ * Note that 'c' may be NULL in case the operation was performed outside the
+ * context of a client modifying the database (for instance when we delete a
+ * key because of expire). */
+void trackingInvalidateKey(client *c, robj *keyobj) {
if (TrackingTable == NULL) return;
sds sdskey = keyobj->ptr;
if (raxSize(PrefixTable) > 0)
- trackingRememberKeyToBroadcast(sdskey,sdslen(sdskey));
+ trackingRememberKeyToBroadcast(c,sdskey,sdslen(sdskey));
rax *ids = raxFind(TrackingTable,(unsigned char*)sdskey,sdslen(sdskey));
if (ids == raxNotFound) return;
@@ -279,19 +287,28 @@ void trackingInvalidateKey(robj *keyobj) {
while(raxNext(&ri)) {
uint64_t id;
memcpy(&id,ri.key,sizeof(id));
- client *c = lookupClientByID(id);
+ client *target = lookupClientByID(id);
/* Note that if the client is in BCAST mode, we don't want to
* send invalidation messages that were pending in the case
* previously the client was not in BCAST mode. This can happen if
* TRACKING is enabled normally, and then the client switches to
* BCAST mode. */
- if (c == NULL ||
- !(c->flags & CLIENT_TRACKING)||
- c->flags & CLIENT_TRACKING_BCAST)
+ if (target == NULL ||
+ !(target->flags & CLIENT_TRACKING)||
+ target->flags & CLIENT_TRACKING_BCAST)
{
continue;
}
- sendTrackingMessage(c,sdskey,sdslen(sdskey),0);
+
+ /* If the client enabled the NOLOOP mode, don't send notifications
+ * about keys changed by the client itself. */
+ if (target->flags & CLIENT_TRACKING_NOLOOP &&
+ target == c)
+ {
+ continue;
+ }
+
+ sendTrackingMessage(target,sdskey,sdslen(sdskey),0);
}
raxStop(&ri);
@@ -383,6 +400,54 @@ void trackingLimitUsedSlots(void) {
timeout_counter++;
}
+/* Generate Redis protocol for an array containing all the key names
+ * in the 'keys' radix tree. If the client is not NULL, the list will not
+ * include keys that were modified the last time by this client, in order
+ * to implement the NOLOOP option.
+ *
+ * If the resultin array would be empty, NULL is returned instead. */
+sds trackingBuildBroadcastReply(client *c, rax *keys) {
+ raxIterator ri;
+ uint64_t count;
+
+ if (c == NULL) {
+ count = raxSize(keys);
+ } else {
+ count = 0;
+ raxStart(&ri,keys);
+ raxSeek(&ri,"^",NULL,0);
+ while(raxNext(&ri)) {
+ if (ri.data != c) count++;
+ }
+ raxStop(&ri);
+
+ if (count == 0) return NULL;
+ }
+
+ /* Create the array reply with the list of keys once, then send
+ * it to all the clients subscribed to this prefix. */
+ char buf[32];
+ size_t len = ll2string(buf,sizeof(buf),count);
+ sds proto = sdsempty();
+ proto = sdsMakeRoomFor(proto,count*15);
+ proto = sdscatlen(proto,"*",1);
+ proto = sdscatlen(proto,buf,len);
+ proto = sdscatlen(proto,"\r\n",2);
+ raxStart(&ri,keys);
+ raxSeek(&ri,"^",NULL,0);
+ while(raxNext(&ri)) {
+ if (c && ri.data == c) continue;
+ len = ll2string(buf,sizeof(buf),ri.key_len);
+ proto = sdscatlen(proto,"$",1);
+ proto = sdscatlen(proto,buf,len);
+ proto = sdscatlen(proto,"\r\n",2);
+ proto = sdscatlen(proto,ri.key,ri.key_len);
+ proto = sdscatlen(proto,"\r\n",2);
+ }
+ raxStop(&ri);
+ return proto;
+}
+
/* This function will run the prefixes of clients in BCAST mode and
* keys that were modified about each prefix, and will send the
* notifications to each client in each prefix. */
@@ -397,26 +462,10 @@ void trackingBroadcastInvalidationMessages(void) {
while(raxNext(&ri)) {
bcastState *bs = ri.data;
if (raxSize(bs->keys)) {
- /* Create the array reply with the list of keys once, then send
- * it to all the clients subscribed to this prefix. */
- char buf[32];
- size_t len = ll2string(buf,sizeof(buf),raxSize(bs->keys));
- sds proto = sdsempty();
- proto = sdsMakeRoomFor(proto,raxSize(bs->keys)*15);
- proto = sdscatlen(proto,"*",1);
- proto = sdscatlen(proto,buf,len);
- proto = sdscatlen(proto,"\r\n",2);
- raxStart(&ri2,bs->keys);
- raxSeek(&ri2,"^",NULL,0);
- while(raxNext(&ri2)) {
- len = ll2string(buf,sizeof(buf),ri2.key_len);
- proto = sdscatlen(proto,"$",1);
- proto = sdscatlen(proto,buf,len);
- proto = sdscatlen(proto,"\r\n",2);
- proto = sdscatlen(proto,ri2.key,ri2.key_len);
- proto = sdscatlen(proto,"\r\n",2);
- }
- raxStop(&ri2);
+
+ /* Generate the common protocol for all the clients that are
+ * not using the NOLOOP option. */
+ sds proto = trackingBuildBroadcastReply(NULL,bs->keys);
/* Send this array of keys to every client in the list. */
raxStart(&ri2,bs->clients);
@@ -424,7 +473,14 @@ void trackingBroadcastInvalidationMessages(void) {
while(raxNext(&ri2)) {
client *c;
memcpy(&c,ri2.key,sizeof(c));
- sendTrackingMessage(c,proto,sdslen(proto),1);
+ if (c->flags & CLIENT_TRACKING_NOLOOP) {
+ /* This client may have certain keys excluded. */
+ sds adhoc = trackingBuildBroadcastReply(c,bs->keys);
+ sendTrackingMessage(c,adhoc,sdslen(adhoc),1);
+ sdsfree(adhoc);
+ } else {
+ sendTrackingMessage(c,proto,sdslen(proto),1);
+ }
}
raxStop(&ri2);