diff options
author | Guy Benoish <guy.benoish@redislabs.com> | 2019-12-26 15:31:37 +0530 |
---|---|---|
committer | Guy Benoish <guy.benoish@redislabs.com> | 2019-12-26 15:31:37 +0530 |
commit | 1f75ce30dfdb1c8c4df8474f9580f2aff032cfb4 (patch) | |
tree | 2fa3174e9ec6d93e6af4e0802473d7be39c81f2a /src/t_stream.c | |
parent | 324e22accf457edc996971bc97f5474349cd7c4c (diff) | |
download | redis-1f75ce30dfdb1c8c4df8474f9580f2aff032cfb4.tar.gz |
Stream: Handle streamID-related edge cases
This commit solves several edge cases that are related to
exhausting the streamID limits: We should correctly calculate
the succeeding streamID instead of blindly incrementing 'seq'
This affects both XREAD and XADD.
Other (unrelated) changes:
Reply with a better error message when trying to add an entry
to a stream that has exhausted last_id
Diffstat (limited to 'src/t_stream.c')
-rw-r--r-- | src/t_stream.c | 28 |
1 files changed, 25 insertions, 3 deletions
diff --git a/src/t_stream.c b/src/t_stream.c index a499f7381..3d46ca0da 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -73,6 +73,21 @@ unsigned long streamLength(const robj *subject) { return s->length; } +/* Set 'id' to be its successor streamID */ +void streamIncrID(streamID *id) { + if (id->seq == UINT64_MAX) { + if (id->ms == UINT64_MAX) { + /* Special case where 'id' is the last possible streamID... */ + id->ms = id->seq = 0; + } else { + id->ms++; + id->seq = 0; + } + } else { + id->seq++; + } +} + /* Generate the next stream item ID given the previous one. If the current * milliseconds Unix time is greater than the previous one, just use this * as time part and start with sequence part of zero. Otherwise we use the @@ -83,8 +98,8 @@ void streamNextID(streamID *last_id, streamID *new_id) { new_id->ms = ms; new_id->seq = 0; } else { - new_id->ms = last_id->ms; - new_id->seq = last_id->seq+1; + *new_id = *last_id; + streamIncrID(new_id); } } @@ -1220,6 +1235,13 @@ void xaddCommand(client *c) { if ((o = streamTypeLookupWriteOrCreate(c,c->argv[1])) == NULL) return; s = o->ptr; + /* Return ASAP if the stream has reached the last possible ID */ + if (s->last_id.ms == UINT64_MAX && s->last_id.seq == UINT64_MAX) { + addReplyError(c,"The stream has exhausted the last possible ID, " + "unable to add more items"); + return; + } + /* Append using the low level function and return the ID. */ if (streamAppendItem(s,c->argv+field_pos,(c->argc-field_pos)/2, &id, id_given ? &id : NULL) @@ -1509,7 +1531,7 @@ void xreadCommand(client *c) { * so start from the next ID, since we want only messages with * IDs greater than start. */ streamID start = *gt; - start.seq++; /* uint64_t can't overflow in this context. */ + streamIncrID(&start); /* Emit the two elements sub-array consisting of the name * of the stream and the data we extracted from it. */ |