summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/blocked.c11
-rw-r--r--src/db.c2
-rw-r--r--src/server.c6
-rw-r--r--src/server.h2
-rw-r--r--src/t_stream.c31
-rw-r--r--tests/unit/type/stream.tcl12
6 files changed, 49 insertions, 15 deletions
diff --git a/src/blocked.c b/src/blocked.c
index 2b43f2b75..b1731e552 100644
--- a/src/blocked.c
+++ b/src/blocked.c
@@ -262,6 +262,16 @@ void handleClientsBlockedOnKeys(void) {
* we can safely call signalKeyAsReady() against this key. */
dictDelete(rl->db->ready_keys,rl->key);
+ /* Even if we are not inside call(), increment the call depth
+ * in order to make sure that keys are expired against a fixed
+ * reference time, and not against the wallclock time. This
+ * way we can lookup an object multiple times (BRPOPLPUSH does
+ * that) without the risk of it being freed in the second
+ * lookup, invalidating the first one.
+ * See https://github.com/antirez/redis/pull/6554. */
+ server.fixed_time_expire++;
+ updateCachedTime(0);
+
/* Serve clients blocked on list key. */
robj *o = lookupKeyWrite(rl->db,rl->key);
if (o != NULL && o->type == OBJ_LIST) {
@@ -457,6 +467,7 @@ void handleClientsBlockedOnKeys(void) {
}
}
}
+ server.fixed_time_expire--;
/* Free this item. */
decrRefCount(rl->key);
diff --git a/src/db.c b/src/db.c
index 3a66d2b45..e5a672a5c 100644
--- a/src/db.c
+++ b/src/db.c
@@ -1151,7 +1151,7 @@ int keyIsExpired(redisDb *db, robj *key) {
* may re-open the same key multiple times, can invalidate an already
* open object in a next call, if the next call will see the key expired,
* while the first did not. */
- else if (server.call_depth > 0) {
+ else if (server.fixed_time_expire > 0) {
now = server.mstime;
}
/* For the other cases, we want to use the most fresh time we have. */
diff --git a/src/server.c b/src/server.c
index 7f735d638..f7fb4f882 100644
--- a/src/server.c
+++ b/src/server.c
@@ -2043,7 +2043,7 @@ void initServer(void) {
server.hz = server.config_hz;
server.pid = getpid();
server.current_client = NULL;
- server.call_depth = 0;
+ server.fixed_time_expire = 0;
server.clients = listCreate();
server.clients_index = raxNew();
server.clients_to_close = listCreate();
@@ -2436,7 +2436,7 @@ void call(client *c, int flags) {
int client_old_flags = c->flags;
struct redisCommand *real_cmd = c->cmd;
- server.call_depth++;
+ server.fixed_time_expire++;
/* Sent the command to clients in MONITOR mode, only if the commands are
* not generated from reading an AOF. */
@@ -2552,7 +2552,7 @@ void call(client *c, int flags) {
redisOpArrayFree(&server.also_propagate);
}
server.also_propagate = prev_also_propagate;
- server.call_depth--;
+ server.fixed_time_expire--;
server.stat_numcommands++;
}
diff --git a/src/server.h b/src/server.h
index 3c91d9680..f74e8f2f6 100644
--- a/src/server.h
+++ b/src/server.h
@@ -990,7 +990,7 @@ struct redisServer {
list *clients_pending_write; /* There is to write or install handler. */
list *slaves, *monitors; /* List of slaves and MONITORs */
client *current_client; /* Current client executing the command. */
- long call_depth; /* call() re-entering count. */
+ long fixed_time_expire; /* If > 0, expire keys against server.mstime. */
rax *clients_index; /* Active clients dictionary by client ID. */
int clients_paused; /* True if clients are currently paused */
mstime_t clients_pause_end_time; /* Time when we undo clients_paused */
diff --git a/src/t_stream.c b/src/t_stream.c
index d3d70062f..d1095bcc2 100644
--- a/src/t_stream.c
+++ b/src/t_stream.c
@@ -173,9 +173,19 @@ int streamCompareID(streamID *a, streamID *b) {
* C_ERR if an ID was given via 'use_id', but adding it failed since the
* current top ID is greater or equal. */
int streamAppendItem(stream *s, robj **argv, int64_t numfields, streamID *added_id, streamID *use_id) {
- /* If an ID was given, check that it's greater than the last entry ID
- * or return an error. */
- if (use_id && streamCompareID(use_id,&s->last_id) <= 0) return C_ERR;
+
+ /* Generate the new entry ID. */
+ streamID id;
+ if (use_id)
+ id = *use_id;
+ else
+ streamNextID(&s->last_id,&id);
+
+ /* Check that the new ID is greater than the last entry ID
+ * or return an error. Automatically generated IDs might
+ * overflow (and wrap-around) when incrementing the sequence
+ part. */
+ if (streamCompareID(&id,&s->last_id) <= 0) return C_ERR;
/* Add the new entry. */
raxIterator ri;
@@ -192,13 +202,6 @@ int streamAppendItem(stream *s, robj **argv, int64_t numfields, streamID *added_
}
raxStop(&ri);
- /* Generate the new entry ID. */
- streamID id;
- if (use_id)
- id = *use_id;
- else
- streamNextID(&s->last_id,&id);
-
/* We have to add the key into the radix tree in lexicographic order,
* to do so we consider the ID as a single 128 bit number written in
* big endian, so that the most significant bytes are the first ones. */
@@ -1216,6 +1219,14 @@ void xaddCommand(client *c) {
return;
}
+ /* Return ASAP if minimal ID (0-0) was given so we avoid possibly creating
+ * a new stream and have streamAppendItem fail, leaving an empty key in the
+ * database. */
+ if (id_given && id.ms == 0 && id.seq == 0) {
+ addReplyError(c,"The ID specified in XADD must be greater than 0-0");
+ return;
+ }
+
/* Lookup the stream at key. */
robj *o;
stream *s;
diff --git a/tests/unit/type/stream.tcl b/tests/unit/type/stream.tcl
index ae6c2d7b8..8eaf36b4c 100644
--- a/tests/unit/type/stream.tcl
+++ b/tests/unit/type/stream.tcl
@@ -79,6 +79,12 @@ start_server {
assert {[streamCompareID $id2 $id3] == -1}
}
+ test {XADD IDs correctly report an error when overflowing} {
+ r DEL mystream
+ r xadd mystream 18446744073709551615-18446744073709551615 a b
+ assert_error ERR* {r xadd mystream * c d}
+ }
+
test {XADD with MAXLEN option} {
r DEL mystream
for {set j 0} {$j < 1000} {incr j} {
@@ -117,6 +123,12 @@ start_server {
assert {[r xlen mystream] == $j}
}
+ test {XADD with ID 0-0} {
+ r DEL mystream
+ catch {r XADD mystream 0-0 k v} err
+ assert {[r EXISTS mystream] == 0}
+ }
+
test {XRANGE COUNT works as expected} {
assert {[llength [r xrange mystream - + COUNT 10]] == 10}
}