summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/blocked.c2
-rw-r--r--src/stream.h1
-rw-r--r--src/t_stream.c28
3 files changed, 27 insertions, 4 deletions
diff --git a/src/blocked.c b/src/blocked.c
index 20c0e760a..06aa5850e 100644
--- a/src/blocked.c
+++ b/src/blocked.c
@@ -388,7 +388,7 @@ void serveClientsBlockedOnStreamKey(robj *o, readyList *rl) {
if (streamCompareID(&s->last_id, gt) > 0) {
streamID start = *gt;
- start.seq++; /* Can't overflow, it's an uint64_t */
+ streamIncrID(&start);
/* Lookup the consumer for the group, if any. */
streamConsumer *consumer = NULL;
diff --git a/src/stream.h b/src/stream.h
index 7de769ba1..b69073994 100644
--- a/src/stream.h
+++ b/src/stream.h
@@ -111,5 +111,6 @@ streamNACK *streamCreateNACK(streamConsumer *consumer);
void streamDecodeID(void *buf, streamID *id);
int streamCompareID(streamID *a, streamID *b);
void streamFreeNACK(streamNACK *na);
+void streamIncrID(streamID *id);
#endif
diff --git a/src/t_stream.c b/src/t_stream.c
index a499f7381..3d46ca0da 100644
--- a/src/t_stream.c
+++ b/src/t_stream.c
@@ -73,6 +73,21 @@ unsigned long streamLength(const robj *subject) {
return s->length;
}
+/* Set 'id' to be its successor streamID */
+void streamIncrID(streamID *id) {
+ if (id->seq == UINT64_MAX) {
+ if (id->ms == UINT64_MAX) {
+ /* Special case where 'id' is the last possible streamID... */
+ id->ms = id->seq = 0;
+ } else {
+ id->ms++;
+ id->seq = 0;
+ }
+ } else {
+ id->seq++;
+ }
+}
+
/* Generate the next stream item ID given the previous one. If the current
* milliseconds Unix time is greater than the previous one, just use this
* as time part and start with sequence part of zero. Otherwise we use the
@@ -83,8 +98,8 @@ void streamNextID(streamID *last_id, streamID *new_id) {
new_id->ms = ms;
new_id->seq = 0;
} else {
- new_id->ms = last_id->ms;
- new_id->seq = last_id->seq+1;
+ *new_id = *last_id;
+ streamIncrID(new_id);
}
}
@@ -1220,6 +1235,13 @@ void xaddCommand(client *c) {
if ((o = streamTypeLookupWriteOrCreate(c,c->argv[1])) == NULL) return;
s = o->ptr;
+ /* Return ASAP if the stream has reached the last possible ID */
+ if (s->last_id.ms == UINT64_MAX && s->last_id.seq == UINT64_MAX) {
+ addReplyError(c,"The stream has exhausted the last possible ID, "
+ "unable to add more items");
+ return;
+ }
+
/* Append using the low level function and return the ID. */
if (streamAppendItem(s,c->argv+field_pos,(c->argc-field_pos)/2,
&id, id_given ? &id : NULL)
@@ -1509,7 +1531,7 @@ void xreadCommand(client *c) {
* so start from the next ID, since we want only messages with
* IDs greater than start. */
streamID start = *gt;
- start.seq++; /* uint64_t can't overflow in this context. */
+ streamIncrID(&start);
/* Emit the two elements sub-array consisting of the name
* of the stream and the data we extracted from it. */