diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/blocked.c | 2 | ||||
-rw-r--r-- | src/stream.h | 1 | ||||
-rw-r--r-- | src/t_stream.c | 28 |
3 files changed, 27 insertions, 4 deletions
diff --git a/src/blocked.c b/src/blocked.c index 20c0e760a..06aa5850e 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -388,7 +388,7 @@ void serveClientsBlockedOnStreamKey(robj *o, readyList *rl) { if (streamCompareID(&s->last_id, gt) > 0) { streamID start = *gt; - start.seq++; /* Can't overflow, it's an uint64_t */ + streamIncrID(&start); /* Lookup the consumer for the group, if any. */ streamConsumer *consumer = NULL; diff --git a/src/stream.h b/src/stream.h index 7de769ba1..b69073994 100644 --- a/src/stream.h +++ b/src/stream.h @@ -111,5 +111,6 @@ streamNACK *streamCreateNACK(streamConsumer *consumer); void streamDecodeID(void *buf, streamID *id); int streamCompareID(streamID *a, streamID *b); void streamFreeNACK(streamNACK *na); +void streamIncrID(streamID *id); #endif diff --git a/src/t_stream.c b/src/t_stream.c index a499f7381..3d46ca0da 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -73,6 +73,21 @@ unsigned long streamLength(const robj *subject) { return s->length; } +/* Set 'id' to be its successor streamID */ +void streamIncrID(streamID *id) { + if (id->seq == UINT64_MAX) { + if (id->ms == UINT64_MAX) { + /* Special case where 'id' is the last possible streamID... */ + id->ms = id->seq = 0; + } else { + id->ms++; + id->seq = 0; + } + } else { + id->seq++; + } +} + /* Generate the next stream item ID given the previous one. If the current * milliseconds Unix time is greater than the previous one, just use this * as time part and start with sequence part of zero. Otherwise we use the @@ -83,8 +98,8 @@ void streamNextID(streamID *last_id, streamID *new_id) { new_id->ms = ms; new_id->seq = 0; } else { - new_id->ms = last_id->ms; - new_id->seq = last_id->seq+1; + *new_id = *last_id; + streamIncrID(new_id); } } @@ -1220,6 +1235,13 @@ void xaddCommand(client *c) { if ((o = streamTypeLookupWriteOrCreate(c,c->argv[1])) == NULL) return; s = o->ptr; + /* Return ASAP if the stream has reached the last possible ID */ + if (s->last_id.ms == UINT64_MAX && s->last_id.seq == UINT64_MAX) { + addReplyError(c,"The stream has exhausted the last possible ID, " + "unable to add more items"); + return; + } + /* Append using the low level function and return the ID. */ if (streamAppendItem(s,c->argv+field_pos,(c->argc-field_pos)/2, &id, id_given ? &id : NULL) @@ -1509,7 +1531,7 @@ void xreadCommand(client *c) { * so start from the next ID, since we want only messages with * IDs greater than start. */ streamID start = *gt; - start.seq++; /* uint64_t can't overflow in this context. */ + streamIncrID(&start); /* Emit the two elements sub-array consisting of the name * of the stream and the data we extracted from it. */ |