summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorny0312 <49037844+ny0312@users.noreply.github.com>2021-05-29 23:20:32 -0700
committerGitHub <noreply@github.com>2021-05-30 09:20:32 +0300
commit53d1acd598b689b2bbc470d907b9e40e548d63f6 (patch)
tree5c2795c16dfced605f0eaadc7c289058ff5e5310 /src
parent501d7755831527b4237f9ed6050ec84203934e4d (diff)
downloadredis-53d1acd598b689b2bbc470d907b9e40e548d63f6.tar.gz
Always replicate time-to-live(TTL) as absolute timestamps in milliseconds (#8474)
Till now, on replica full-sync we used to transfer absolute time for TTL, however when a command arrived (EXPIRE or EXPIREAT), we used to propagate it as is to replicas (possibly with relative time), but always translate it to EXPIREAT (absolute time) to AOF. This commit changes that and will always use absolute time for propagation. see discussion in #8433 Furthermore, we Introduce new commands: `EXPIRETIME/PEXPIRETIME` that allow extracting the absolute TTL time from a key.
Diffstat (limited to 'src')
-rw-r--r--src/aof.c79
-rw-r--r--src/cluster.c7
-rw-r--r--src/expire.c25
-rw-r--r--src/server.c14
-rw-r--r--src/server.h6
-rw-r--r--src/t_string.c121
6 files changed, 109 insertions, 143 deletions
diff --git a/src/aof.c b/src/aof.c
index 35856f49b..f4209c6c5 100644
--- a/src/aof.c
+++ b/src/aof.c
@@ -575,43 +575,7 @@ sds catAppendOnlyGenericCommand(sds dst, int argc, robj **argv) {
return dst;
}
-/* Create the sds representation of a PEXPIREAT command, using
- * 'seconds' as time to live and 'cmd' to understand what command
- * we are translating into a PEXPIREAT.
- *
- * This command is used in order to translate EXPIRE and PEXPIRE commands
- * into PEXPIREAT command so that we retain precision in the append only
- * file, and the time is always absolute and not relative. */
-sds catAppendOnlyExpireAtCommand(sds buf, struct redisCommand *cmd, robj *key, robj *seconds) {
- long long when;
- robj *argv[3];
-
- /* Make sure we can use strtoll */
- seconds = getDecodedObject(seconds);
- when = strtoll(seconds->ptr,NULL,10);
- /* Convert argument into milliseconds for EXPIRE, SETEX, EXPIREAT */
- if (cmd->proc == expireCommand || cmd->proc == setexCommand ||
- cmd->proc == expireatCommand)
- {
- when *= 1000;
- }
- /* Convert into absolute time for EXPIRE, PEXPIRE, SETEX, PSETEX */
- if (cmd->proc == expireCommand || cmd->proc == pexpireCommand ||
- cmd->proc == setexCommand || cmd->proc == psetexCommand)
- {
- when += mstime();
- }
- decrRefCount(seconds);
-
- argv[0] = shared.pexpireat;
- argv[1] = key;
- argv[2] = createStringObjectFromLongLong(when);
- buf = catAppendOnlyGenericCommand(buf, 3, argv);
- decrRefCount(argv[2]);
- return buf;
-}
-
-void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
+void feedAppendOnlyFile(int dictid, robj **argv, int argc) {
sds buf = sdsempty();
/* The DB this command was targeting is not the same as the last command
* we appended. To issue a SELECT command is needed. */
@@ -624,44 +588,9 @@ void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int a
server.aof_selected_db = dictid;
}
- if (cmd->proc == expireCommand || cmd->proc == pexpireCommand ||
- cmd->proc == expireatCommand) {
- /* Translate EXPIRE/PEXPIRE/EXPIREAT into PEXPIREAT */
- buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]);
- } else if (cmd->proc == setCommand && argc > 3) {
- robj *pxarg = NULL;
- /* When SET is used with EX/PX argument setGenericCommand propagates them with PX millisecond argument.
- * So since the command arguments are re-written there, we can rely here on the index of PX being 3. */
- if (!strcasecmp(argv[3]->ptr, "px")) {
- pxarg = argv[4];
- }
- /* For AOF we convert SET key value relative time in milliseconds to SET key value absolute time in
- * millisecond. Whenever the condition is true it implies that original SET has been transformed
- * to SET PX with millisecond time argument so we do not need to worry about unit here.*/
- if (pxarg) {
- robj *millisecond = getDecodedObject(pxarg);
- long long when = strtoll(millisecond->ptr,NULL,10);
- when += mstime();
-
- decrRefCount(millisecond);
-
- robj *newargs[5];
- newargs[0] = argv[0];
- newargs[1] = argv[1];
- newargs[2] = argv[2];
- newargs[3] = shared.pxat;
- newargs[4] = createStringObjectFromLongLong(when);
- buf = catAppendOnlyGenericCommand(buf,5,newargs);
- decrRefCount(newargs[4]);
- } else {
- buf = catAppendOnlyGenericCommand(buf,argc,argv);
- }
- } else {
- /* All the other commands don't need translation or need the
- * same translation already operated in the command vector
- * for the replication itself. */
- buf = catAppendOnlyGenericCommand(buf,argc,argv);
- }
+ /* All commands should be propagated the same way in AOF as in replication.
+ * No need for AOF-specific translation. */
+ buf = catAppendOnlyGenericCommand(buf,argc,argv);
/* Append to the AOF buffer. This will be flushed on disk just before
* of re-entering the event loop, so before the client will get a
diff --git a/src/cluster.c b/src/cluster.c
index b29f3cdac..f34c33162 100644
--- a/src/cluster.c
+++ b/src/cluster.c
@@ -5206,6 +5206,13 @@ void restoreCommand(client *c) {
dbAdd(c->db,key,obj);
if (ttl) {
setExpire(c,c->db,key,ttl);
+ if (!absttl) {
+ /* Propagate TTL as absolute timestamp */
+ robj *ttl_obj = createStringObjectFromLongLong(ttl);
+ rewriteClientCommandArgument(c,2,ttl_obj);
+ decrRefCount(ttl_obj);
+ rewriteClientCommandArgument(c,c->argc,shared.absttl);
+ }
}
objectSetLRUOrLFU(obj,lfu_freq,lru_idle,lru_clock,1000);
signalModifiedKey(c,c->db,key);
diff --git a/src/expire.c b/src/expire.c
index 982301542..8996ae57e 100644
--- a/src/expire.c
+++ b/src/expire.c
@@ -539,6 +539,10 @@ void expireGenericCommand(client *c, long long basetime, int unit) {
} else {
setExpire(c,c->db,key,when);
addReply(c,shared.cone);
+ /* Propagate as PEXPIREAT millisecond-timestamp */
+ robj *when_obj = createStringObjectFromLongLong(when);
+ rewriteClientCommandVector(c, 3, shared.pexpireat, key, when_obj);
+ decrRefCount(when_obj);
signalModifiedKey(c,c->db,key);
notifyKeyspaceEvent(NOTIFY_GENERIC,"expire",key,c->db->id);
server.dirty++;
@@ -566,8 +570,8 @@ void pexpireatCommand(client *c) {
expireGenericCommand(c,0,UNIT_MILLISECONDS);
}
-/* Implements TTL and PTTL */
-void ttlGenericCommand(client *c, int output_ms) {
+/* Implements TTL, PTTL and EXPIRETIME */
+void ttlGenericCommand(client *c, int output_ms, int output_abs) {
long long expire, ttl = -1;
/* If the key does not exist at all, return -2 */
@@ -575,11 +579,12 @@ void ttlGenericCommand(client *c, int output_ms) {
addReplyLongLong(c,-2);
return;
}
+
/* The key exists. Return -1 if it has no expire, or the actual
* TTL value otherwise. */
expire = getExpire(c->db,c->argv[1]);
if (expire != -1) {
- ttl = expire-mstime();
+ ttl = output_abs ? expire : expire-mstime();
if (ttl < 0) ttl = 0;
}
if (ttl == -1) {
@@ -591,12 +596,22 @@ void ttlGenericCommand(client *c, int output_ms) {
/* TTL key */
void ttlCommand(client *c) {
- ttlGenericCommand(c, 0);
+ ttlGenericCommand(c, 0, 0);
}
/* PTTL key */
void pttlCommand(client *c) {
- ttlGenericCommand(c, 1);
+ ttlGenericCommand(c, 1, 0);
+}
+
+/* EXPIRETIME key */
+void expiretimeCommand(client *c) {
+ ttlGenericCommand(c, 0, 1);
+}
+
+/* PEXPIRETIME key */
+void pexpiretimeCommand(client *c) {
+ ttlGenericCommand(c, 1, 1);
}
/* PERSIST key */
diff --git a/src/server.c b/src/server.c
index 835872aeb..428e4aef1 100644
--- a/src/server.c
+++ b/src/server.c
@@ -800,6 +800,14 @@ struct redisCommand redisCommandTable[] = {
"read-only fast random @keyspace",
0,NULL,1,1,1,0,0,0},
+ {"expiretime",expiretimeCommand,2,
+ "read-only fast random @keyspace",
+ 0,NULL,1,1,1,0,0,0},
+
+ {"pexpiretime",pexpiretimeCommand,2,
+ "read-only fast random @keyspace",
+ 0,NULL,1,1,1,0,0,0},
+
{"persist",persistCommand,2,
"write fast @keyspace",
0,NULL,1,1,1,0,0,0},
@@ -2601,7 +2609,6 @@ void createSharedObjects(void) {
shared.left = createStringObject("left",4);
shared.right = createStringObject("right",5);
shared.pxat = createStringObject("PXAT", 4);
- shared.px = createStringObject("PX",2);
shared.time = createStringObject("TIME",4);
shared.retrycount = createStringObject("RETRYCOUNT",10);
shared.force = createStringObject("FORCE",5);
@@ -2611,6 +2618,7 @@ void createSharedObjects(void) {
shared.ping = createStringObject("ping",4);
shared.setid = createStringObject("SETID",5);
shared.keepttl = createStringObject("KEEPTTL",7);
+ shared.absttl = createStringObject("ABSTTL",6);
shared.load = createStringObject("LOAD",4);
shared.createconsumer = createStringObject("CREATECONSUMER",14);
shared.getack = createStringObject("GETACK",6);
@@ -3574,6 +3582,8 @@ struct redisCommand *lookupCommandOrOriginal(sds name) {
void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
int flags)
{
+ UNUSED(cmd);
+
if (!server.replication_allowed)
return;
@@ -3590,7 +3600,7 @@ void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
serverAssert(!(areClientsPaused() && !server.client_pause_in_transaction));
if (server.aof_state != AOF_OFF && flags & PROPAGATE_AOF)
- feedAppendOnlyFile(cmd,dbid,argv,argc);
+ feedAppendOnlyFile(dbid,argv,argc);
if (flags & PROPAGATE_REPL)
replicationFeedSlaves(server.slaves,dbid,argv,argc);
}
diff --git a/src/server.h b/src/server.h
index aa0f580c6..52778d1ed 100644
--- a/src/server.h
+++ b/src/server.h
@@ -983,7 +983,7 @@ struct sharedObjectsStruct {
*rpop, *lpop, *lpush, *rpoplpush, *lmove, *blmove, *zpopmin, *zpopmax,
*emptyscan, *multi, *exec, *left, *right, *hset, *srem, *xgroup, *xclaim,
*script, *replconf, *eval, *persist, *set, *pexpireat, *pexpire,
- *time, *pxat, *px, *retrycount, *force, *justid,
+ *time, *pxat, *absttl, *retrycount, *force, *justid,
*lastid, *ping, *setid, *keepttl, *load, *createconsumer,
*getack, *special_asterick, *special_equals, *default_username, *redacted,
*select[PROTO_SHARED_SELECT_CMDS],
@@ -2068,7 +2068,7 @@ int bg_unlink(const char *filename);
/* AOF persistence */
void flushAppendOnlyFile(int force);
-void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc);
+void feedAppendOnlyFile(int dictid, robj **argv, int argc);
void aofRemoveTempFile(pid_t childpid);
int rewriteAppendOnlyFileBackground(void);
int loadAppendOnlyFile(char *filename);
@@ -2561,6 +2561,8 @@ void getsetCommand(client *c);
void ttlCommand(client *c);
void touchCommand(client *c);
void pttlCommand(client *c);
+void expiretimeCommand(client *c);
+void pexpiretimeCommand(client *c);
void persistCommand(client *c);
void replicaofCommand(client *c);
void roleCommand(client *c);
diff --git a/src/t_string.c b/src/t_string.c
index db6f7042e..99843c863 100644
--- a/src/t_string.c
+++ b/src/t_string.c
@@ -72,26 +72,13 @@ static int checkStringLength(client *c, long long size) {
#define OBJ_PXAT (1<<7) /* Set if timestamp in ms is given */
#define OBJ_PERSIST (1<<8) /* Set if we need to remove the ttl */
-void setGenericCommand(client *c, int flags, robj *key, robj *val, robj *expire, int unit, robj *ok_reply, robj *abort_reply) {
- long long milliseconds = 0, when = 0; /* initialized to avoid any harmness warning */
+/* Forward declaration */
+static int getExpireMillisecondsOrReply(client *c, robj *expire, int flags, int unit, long long *milliseconds);
- if (expire) {
- if (getLongLongFromObjectOrReply(c, expire, &milliseconds, NULL) != C_OK)
- return;
- if (milliseconds <= 0 || (unit == UNIT_SECONDS && milliseconds > LLONG_MAX / 1000)) {
- /* Negative value provided or multiplication is gonna overflow. */
- addReplyErrorFormat(c, "invalid expire time in %s", c->cmd->name);
- return;
- }
- if (unit == UNIT_SECONDS) milliseconds *= 1000;
- when = milliseconds;
- if ((flags & OBJ_PX) || (flags & OBJ_EX))
- when += mstime();
- if (when <= 0) {
- /* Overflow detected. */
- addReplyErrorFormat(c, "invalid expire time in %s", c->cmd->name);
- return;
- }
+void setGenericCommand(client *c, int flags, robj *key, robj *val, robj *expire, int unit, robj *ok_reply, robj *abort_reply) {
+ long long milliseconds = 0; /* initialized to avoid any harmness warning */
+ if (expire && getExpireMillisecondsOrReply(c, expire, flags, unit, &milliseconds) != C_OK) {
+ return;
}
if ((flags & OBJ_SET_NX && lookupKeyWrite(c->db,key) != NULL) ||
@@ -108,24 +95,17 @@ void setGenericCommand(client *c, int flags, robj *key, robj *val, robj *expire,
genericSetKey(c,c->db,key, val,flags & OBJ_KEEPTTL,1);
server.dirty++;
notifyKeyspaceEvent(NOTIFY_STRING,"set",key,c->db->id);
+
if (expire) {
- setExpire(c,c->db,key,when);
+ setExpire(c,c->db,key,milliseconds);
+ /* Propagate as SET Key Value PXAT millisecond-timestamp if there is
+ * EX/PX/EXAT/PXAT flag. */
+ robj *milliseconds_obj = createStringObjectFromLongLong(milliseconds);
+ rewriteClientCommandVector(c, 5, shared.set, key, val, shared.pxat, milliseconds_obj);
+ decrRefCount(milliseconds_obj);
notifyKeyspaceEvent(NOTIFY_GENERIC,"expire",key,c->db->id);
-
- /* Propagate as SET Key Value PXAT millisecond-timestamp if there is EXAT/PXAT or
- * propagate as SET Key Value PX millisecond if there is EX/PX flag.
- *
- * Additionally when we propagate the SET with PX (relative millisecond) we translate
- * it again to SET with PXAT for the AOF.
- *
- * Additional care is required while modifying the argument order. AOF relies on the
- * exp argument being at index 3. (see feedAppendOnlyFile)
- * */
- robj *exp = (flags & OBJ_PXAT) || (flags & OBJ_EXAT) ? shared.pxat : shared.px;
- robj *millisecondObj = createStringObjectFromLongLong(milliseconds);
- rewriteClientCommandVector(c,5,shared.set,key,val,exp,millisecondObj);
- decrRefCount(millisecondObj);
}
+
if (!(flags & OBJ_SET_GET)) {
addReply(c, ok_reply ? ok_reply : shared.ok);
}
@@ -150,6 +130,45 @@ void setGenericCommand(client *c, int flags, robj *key, robj *val, robj *expire,
}
}
+/*
+ * Extract the `expire` argument of a given GET/SET command as an absolute timestamp in milliseconds.
+ *
+ * "client" is the client that sent the `expire` argument.
+ * "expire" is the `expire` argument to be extracted.
+ * "flags" represents the behavior of the command (e.g. PX or EX).
+ * "unit" is the original unit of the given `expire` argument (e.g. UNIT_SECONDS).
+ * "milliseconds" is output argument.
+ *
+ * If return C_OK, "milliseconds" output argument will be set to the resulting absolute timestamp.
+ * If return C_ERR, an error reply has been added to the given client.
+ */
+static int getExpireMillisecondsOrReply(client *c, robj *expire, int flags, int unit, long long *milliseconds) {
+ int ret = getLongLongFromObjectOrReply(c, expire, milliseconds, NULL);
+ if (ret != C_OK) {
+ return ret;
+ }
+
+ if (*milliseconds <= 0 || (unit == UNIT_SECONDS && *milliseconds > LLONG_MAX / 1000)) {
+ /* Negative value provided or multiplication is gonna overflow. */
+ addReplyErrorFormat(c, "invalid expire time in %s", c->cmd->name);
+ return C_ERR;
+ }
+
+ if (unit == UNIT_SECONDS) *milliseconds *= 1000;
+
+ if ((flags & OBJ_PX) || (flags & OBJ_EX)) {
+ *milliseconds += mstime();
+ }
+
+ if (*milliseconds <= 0) {
+ /* Overflow detected. */
+ addReplyErrorFormat(c,"invalid expire time in %s",c->cmd->name);
+ return C_ERR;
+ }
+
+ return C_OK;
+}
+
#define COMMAND_GET 0
#define COMMAND_SET 1
/*
@@ -338,26 +357,10 @@ void getexCommand(client *c) {
return;
}
- long long milliseconds = 0, when = 0;
-
/* Validate the expiration time value first */
- if (expire) {
- if (getLongLongFromObjectOrReply(c, expire, &milliseconds, NULL) != C_OK)
- return;
- if (milliseconds <= 0 || (unit == UNIT_SECONDS && milliseconds > LLONG_MAX / 1000)) {
- /* Negative value provided or multiplication is gonna overflow. */
- addReplyErrorFormat(c, "invalid expire time in %s", c->cmd->name);
- return;
- }
- if (unit == UNIT_SECONDS) milliseconds *= 1000;
- when = milliseconds;
- if ((flags & OBJ_PX) || (flags & OBJ_EX))
- when += mstime();
- if (when <= 0) {
- /* Overflow detected. */
- addReplyErrorFormat(c, "invalid expire time in %s", c->cmd->name);
- return;
- }
+ long long milliseconds = 0;
+ if (expire && getExpireMillisecondsOrReply(c, expire, flags, unit, &milliseconds) != C_OK) {
+ return;
}
/* We need to do this before we expire the key or delete it */
@@ -377,12 +380,12 @@ void getexCommand(client *c) {
notifyKeyspaceEvent(NOTIFY_GENERIC, "del", c->argv[1], c->db->id);
server.dirty++;
} else if (expire) {
- setExpire(c,c->db,c->argv[1],when);
- /* Propagate */
- robj *exp = (flags & OBJ_PXAT) || (flags & OBJ_EXAT) ? shared.pexpireat : shared.pexpire;
- robj* millisecondObj = createStringObjectFromLongLong(milliseconds);
- rewriteClientCommandVector(c,3,exp,c->argv[1],millisecondObj);
- decrRefCount(millisecondObj);
+ setExpire(c,c->db,c->argv[1],milliseconds);
+ /* Propagate as PXEXPIREAT millisecond-timestamp if there is
+ * EX/PX/EXAT/PXAT flag and the key has not expired. */
+ robj *milliseconds_obj = createStringObjectFromLongLong(milliseconds);
+ rewriteClientCommandVector(c,3,shared.pexpireat,c->argv[1],milliseconds_obj);
+ decrRefCount(milliseconds_obj);
signalModifiedKey(c, c->db, c->argv[1]);
notifyKeyspaceEvent(NOTIFY_GENERIC,"expire",c->argv[1],c->db->id);
server.dirty++;