summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorantirez <antirez@gmail.com>2018-01-29 18:32:38 +0100
committerantirez <antirez@gmail.com>2018-03-15 12:54:10 +0100
commite76fb4ab2546a6febc361d3b9dc4e76711f47f6b (patch)
treeb72cb9d84ae33373def22410b0fd1119289ac066
parentf3708af7f9ea6b3a097d2b33ee7c28624b753215 (diff)
downloadredis-e76fb4ab2546a6febc361d3b9dc4e76711f47f6b.tar.gz
CG: XPENDING should not create consumers and obey to count.
-rw-r--r--src/blocked.c3
-rw-r--r--src/stream.h2
-rw-r--r--src/t_stream.c16
3 files changed, 15 insertions, 6 deletions
diff --git a/src/blocked.c b/src/blocked.c
index 8f472f2b8..2de798378 100644
--- a/src/blocked.c
+++ b/src/blocked.c
@@ -331,7 +331,8 @@ void handleClientsBlockedOnKeys(void) {
}
if (group) {
consumer = streamLookupConsumer(group,
- receiver->bpop.xread_consumer->ptr);
+ receiver->bpop.xread_consumer->ptr,
+ 1);
}
/* Note that after we unblock the client, 'gt'
diff --git a/src/stream.h b/src/stream.h
index 908e9ff72..bd999d77c 100644
--- a/src/stream.h
+++ b/src/stream.h
@@ -95,6 +95,6 @@ int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields);
void streamIteratorGetField(streamIterator *si, unsigned char **fieldptr, unsigned char **valueptr, int64_t *fieldlen, int64_t *valuelen);
void streamIteratorStop(streamIterator *si);
streamCG *streamLookupCG(stream *s, sds groupname);
-streamConsumer *streamLookupConsumer(streamCG *cg, sds name);
+streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int create);
#endif
diff --git a/src/t_stream.c b/src/t_stream.c
index c659a7017..95691157a 100644
--- a/src/t_stream.c
+++ b/src/t_stream.c
@@ -1213,7 +1213,7 @@ void xreadCommand(client *c) {
addReplyBulk(c,c->argv[i+streams_arg]);
streamConsumer *consumer = NULL;
if (groups) consumer = streamLookupConsumer(groups[i],
- consumername->ptr);
+ consumername->ptr,1);
streamReplyWithRange(c,s,&start,NULL,count,0,
groups ? groups[i] : NULL,
consumer, noack);
@@ -1337,10 +1337,11 @@ streamCG *streamLookupCG(stream *s, sds groupname) {
* consumer does not exist it is automatically created as a side effect
* of calling this function, otherwise its last seen time is updated and
* the existing consumer reference returned. */
-streamConsumer *streamLookupConsumer(streamCG *cg, sds name) {
+streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int create) {
streamConsumer *consumer = raxFind(cg->consumers,(unsigned char*)name,
sdslen(name));
if (consumer == raxNotFound) {
+ if (!create) return NULL;
consumer = zmalloc(sizeof(*consumer));
consumer->name = sdsdup(name);
consumer->pel = raxNew();
@@ -1542,8 +1543,14 @@ void xpendingCommand(client *c) {
/* XPENDING <key> <group> <start> <stop> <count> [<consumer>] variant. */
else {
streamConsumer *consumer = consumername ?
- streamLookupConsumer(group,consumername->ptr):
+ streamLookupConsumer(group,consumername->ptr,0):
NULL;
+
+ /* If a consumer name was mentioned but it does not exist, we can
+ * just return an empty array. */
+ if (consumername && consumer == NULL)
+ addReplyMultiBulkLen(c,0);
+
rax *pel = consumer ? consumer->pel : group->pel;
unsigned char startkey[sizeof(streamID)];
unsigned char endkey[sizeof(streamID)];
@@ -1557,10 +1564,11 @@ void xpendingCommand(client *c) {
void *arraylen_ptr = addDeferredMultiBulkLength(c);
size_t arraylen = 0;
- while(raxNext(&ri) && memcmp(ri.key,endkey,ri.key_len) <= 0) {
+ while(count && raxNext(&ri) && memcmp(ri.key,endkey,ri.key_len) <= 0) {
streamNACK *nack = ri.data;
arraylen++;
+ count--;
addReplyMultiBulkLen(c,4);
/* Entry ID. */