diff options
Diffstat (limited to 'src/t_stream.c')
-rw-r--r-- | src/t_stream.c | 29 |
1 files changed, 19 insertions, 10 deletions
diff --git a/src/t_stream.c b/src/t_stream.c index ae57202c1..0f0f97a1d 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -797,6 +797,16 @@ int streamDeleteItem(stream *s, streamID *id) { return deleted; } +/* Get the last valid (non-tombstone) streamID of 's'. */ +void streamLastValidID(stream *s, streamID *maxid) +{ + streamIterator si; + streamIteratorStart(&si,s,NULL,NULL,1); + int64_t numfields; + streamIteratorGetID(&si,maxid,&numfields); + streamIteratorStop(&si); +} + /* Emit a reply in the client output buffer by formatting a Stream ID * in the standard <ms>-<seq> format, using the simple string protocol * of REPL. */ @@ -1506,20 +1516,23 @@ void xreadCommand(client *c) { { serve_synchronously = 1; serve_history = 1; - } else { + } else if (s->length) { /* We also want to serve a consumer in a consumer group * synchronously in case the group top item delivered is smaller * than what the stream has inside. */ - streamID *last = &groups[i]->last_id; - if (s->length && (streamCompareID(&s->last_id, last) > 0)) { + streamID maxid, *last = &groups[i]->last_id; + streamLastValidID(s, &maxid); + if (streamCompareID(&maxid, last) > 0) { serve_synchronously = 1; *gt = *last; } } - } else { + } else if (s->length) { /* For consumers without a group, we serve synchronously if we can * actually provide at least one item from the stream. */ - if (s->length && (streamCompareID(&s->last_id, gt) > 0)) { + streamID maxid; + streamLastValidID(s, &maxid); + if (streamCompareID(&maxid, gt) > 0) { serve_synchronously = 1; } } @@ -1871,11 +1884,7 @@ void xsetidCommand(client *c) { * item, otherwise the fundamental ID monotonicity assumption is violated. */ if (s->length > 0) { streamID maxid; - streamIterator si; - streamIteratorStart(&si,s,NULL,NULL,1); - int64_t numfields; - streamIteratorGetID(&si,&maxid,&numfields); - streamIteratorStop(&si); + streamLastValidID(s,&maxid); if (streamCompareID(&id,&maxid) < 0) { addReplyError(c,"The ID specified in XSETID is smaller than the " |