summaryrefslogtreecommitdiff
path: root/src/t_stream.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/t_stream.c')
-rw-r--r--src/t_stream.c29
1 files changed, 19 insertions, 10 deletions
diff --git a/src/t_stream.c b/src/t_stream.c
index ae57202c1..0f0f97a1d 100644
--- a/src/t_stream.c
+++ b/src/t_stream.c
@@ -797,6 +797,16 @@ int streamDeleteItem(stream *s, streamID *id) {
return deleted;
}
+/* Get the last valid (non-tombstone) streamID of 's'. */
+void streamLastValidID(stream *s, streamID *maxid)
+{
+ streamIterator si;
+ streamIteratorStart(&si,s,NULL,NULL,1);
+ int64_t numfields;
+ streamIteratorGetID(&si,maxid,&numfields);
+ streamIteratorStop(&si);
+}
+
/* Emit a reply in the client output buffer by formatting a Stream ID
* in the standard <ms>-<seq> format, using the simple string protocol
* of REPL. */
@@ -1506,20 +1516,23 @@ void xreadCommand(client *c) {
{
serve_synchronously = 1;
serve_history = 1;
- } else {
+ } else if (s->length) {
/* We also want to serve a consumer in a consumer group
* synchronously in case the group top item delivered is smaller
* than what the stream has inside. */
- streamID *last = &groups[i]->last_id;
- if (s->length && (streamCompareID(&s->last_id, last) > 0)) {
+ streamID maxid, *last = &groups[i]->last_id;
+ streamLastValidID(s, &maxid);
+ if (streamCompareID(&maxid, last) > 0) {
serve_synchronously = 1;
*gt = *last;
}
}
- } else {
+ } else if (s->length) {
/* For consumers without a group, we serve synchronously if we can
* actually provide at least one item from the stream. */
- if (s->length && (streamCompareID(&s->last_id, gt) > 0)) {
+ streamID maxid;
+ streamLastValidID(s, &maxid);
+ if (streamCompareID(&maxid, gt) > 0) {
serve_synchronously = 1;
}
}
@@ -1871,11 +1884,7 @@ void xsetidCommand(client *c) {
* item, otherwise the fundamental ID monotonicity assumption is violated. */
if (s->length > 0) {
streamID maxid;
- streamIterator si;
- streamIteratorStart(&si,s,NULL,NULL,1);
- int64_t numfields;
- streamIteratorGetID(&si,&maxid,&numfields);
- streamIteratorStop(&si);
+ streamLastValidID(s,&maxid);
if (streamCompareID(&id,&maxid) < 0) {
addReplyError(c,"The ID specified in XSETID is smaller than the "