diff options
-rw-r--r-- | src/blocked.c | 11 | ||||
-rw-r--r-- | src/db.c | 2 | ||||
-rw-r--r-- | src/server.c | 6 | ||||
-rw-r--r-- | src/server.h | 2 | ||||
-rw-r--r-- | src/t_stream.c | 31 | ||||
-rw-r--r-- | tests/unit/type/stream.tcl | 12 |
6 files changed, 49 insertions, 15 deletions
diff --git a/src/blocked.c b/src/blocked.c index 2b43f2b75..b1731e552 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -262,6 +262,16 @@ void handleClientsBlockedOnKeys(void) { * we can safely call signalKeyAsReady() against this key. */ dictDelete(rl->db->ready_keys,rl->key); + /* Even if we are not inside call(), increment the call depth + * in order to make sure that keys are expired against a fixed + * reference time, and not against the wallclock time. This + * way we can lookup an object multiple times (BRPOPLPUSH does + * that) without the risk of it being freed in the second + * lookup, invalidating the first one. + * See https://github.com/antirez/redis/pull/6554. */ + server.fixed_time_expire++; + updateCachedTime(0); + /* Serve clients blocked on list key. */ robj *o = lookupKeyWrite(rl->db,rl->key); if (o != NULL && o->type == OBJ_LIST) { @@ -457,6 +467,7 @@ void handleClientsBlockedOnKeys(void) { } } } + server.fixed_time_expire--; /* Free this item. */ decrRefCount(rl->key); @@ -1151,7 +1151,7 @@ int keyIsExpired(redisDb *db, robj *key) { * may re-open the same key multiple times, can invalidate an already * open object in a next call, if the next call will see the key expired, * while the first did not. */ - else if (server.call_depth > 0) { + else if (server.fixed_time_expire > 0) { now = server.mstime; } /* For the other cases, we want to use the most fresh time we have. */ diff --git a/src/server.c b/src/server.c index 7f735d638..f7fb4f882 100644 --- a/src/server.c +++ b/src/server.c @@ -2043,7 +2043,7 @@ void initServer(void) { server.hz = server.config_hz; server.pid = getpid(); server.current_client = NULL; - server.call_depth = 0; + server.fixed_time_expire = 0; server.clients = listCreate(); server.clients_index = raxNew(); server.clients_to_close = listCreate(); @@ -2436,7 +2436,7 @@ void call(client *c, int flags) { int client_old_flags = c->flags; struct redisCommand *real_cmd = c->cmd; - server.call_depth++; + server.fixed_time_expire++; /* Sent the command to clients in MONITOR mode, only if the commands are * not generated from reading an AOF. */ @@ -2552,7 +2552,7 @@ void call(client *c, int flags) { redisOpArrayFree(&server.also_propagate); } server.also_propagate = prev_also_propagate; - server.call_depth--; + server.fixed_time_expire--; server.stat_numcommands++; } diff --git a/src/server.h b/src/server.h index 3c91d9680..f74e8f2f6 100644 --- a/src/server.h +++ b/src/server.h @@ -990,7 +990,7 @@ struct redisServer { list *clients_pending_write; /* There is to write or install handler. */ list *slaves, *monitors; /* List of slaves and MONITORs */ client *current_client; /* Current client executing the command. */ - long call_depth; /* call() re-entering count. */ + long fixed_time_expire; /* If > 0, expire keys against server.mstime. */ rax *clients_index; /* Active clients dictionary by client ID. */ int clients_paused; /* True if clients are currently paused */ mstime_t clients_pause_end_time; /* Time when we undo clients_paused */ diff --git a/src/t_stream.c b/src/t_stream.c index d3d70062f..d1095bcc2 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -173,9 +173,19 @@ int streamCompareID(streamID *a, streamID *b) { * C_ERR if an ID was given via 'use_id', but adding it failed since the * current top ID is greater or equal. */ int streamAppendItem(stream *s, robj **argv, int64_t numfields, streamID *added_id, streamID *use_id) { - /* If an ID was given, check that it's greater than the last entry ID - * or return an error. */ - if (use_id && streamCompareID(use_id,&s->last_id) <= 0) return C_ERR; + + /* Generate the new entry ID. */ + streamID id; + if (use_id) + id = *use_id; + else + streamNextID(&s->last_id,&id); + + /* Check that the new ID is greater than the last entry ID + * or return an error. Automatically generated IDs might + * overflow (and wrap-around) when incrementing the sequence + part. */ + if (streamCompareID(&id,&s->last_id) <= 0) return C_ERR; /* Add the new entry. */ raxIterator ri; @@ -192,13 +202,6 @@ int streamAppendItem(stream *s, robj **argv, int64_t numfields, streamID *added_ } raxStop(&ri); - /* Generate the new entry ID. */ - streamID id; - if (use_id) - id = *use_id; - else - streamNextID(&s->last_id,&id); - /* We have to add the key into the radix tree in lexicographic order, * to do so we consider the ID as a single 128 bit number written in * big endian, so that the most significant bytes are the first ones. */ @@ -1216,6 +1219,14 @@ void xaddCommand(client *c) { return; } + /* Return ASAP if minimal ID (0-0) was given so we avoid possibly creating + * a new stream and have streamAppendItem fail, leaving an empty key in the + * database. */ + if (id_given && id.ms == 0 && id.seq == 0) { + addReplyError(c,"The ID specified in XADD must be greater than 0-0"); + return; + } + /* Lookup the stream at key. */ robj *o; stream *s; diff --git a/tests/unit/type/stream.tcl b/tests/unit/type/stream.tcl index ae6c2d7b8..8eaf36b4c 100644 --- a/tests/unit/type/stream.tcl +++ b/tests/unit/type/stream.tcl @@ -79,6 +79,12 @@ start_server { assert {[streamCompareID $id2 $id3] == -1} } + test {XADD IDs correctly report an error when overflowing} { + r DEL mystream + r xadd mystream 18446744073709551615-18446744073709551615 a b + assert_error ERR* {r xadd mystream * c d} + } + test {XADD with MAXLEN option} { r DEL mystream for {set j 0} {$j < 1000} {incr j} { @@ -117,6 +123,12 @@ start_server { assert {[r xlen mystream] == $j} } + test {XADD with ID 0-0} { + r DEL mystream + catch {r XADD mystream 0-0 k v} err + assert {[r EXISTS mystream] == 0} + } + test {XRANGE COUNT works as expected} { assert {[llength [r xrange mystream - + COUNT 10]] == 10} } |