summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSalvatore Sanfilippo <antirez@gmail.com>2020-05-04 10:50:22 +0200
committerGitHub <noreply@github.com>2020-05-04 10:50:22 +0200
commit68ad748e77e11f774c4e95f4d6ab7713712cd77c (patch)
tree47f885f583f9525b52e6f80bea9420a5e16e8b35
parentb3f3d5a42fec96bb062feb2cc6f5e79d7039b874 (diff)
parentbce3d08c66a1bf22ea852295a57b92a8024d4a1b (diff)
downloadredis-68ad748e77e11f774c4e95f4d6ab7713712cd77c.tar.gz
Merge pull request #7190 from guybe7/fix_consumer_seen_time
XPENDING should not update consumer's seen-time
-rw-r--r--src/blocked.c7
-rw-r--r--src/rdb.c4
-rw-r--r--src/stream.h7
-rw-r--r--src/t_stream.c35
4 files changed, 33 insertions, 20 deletions
diff --git a/src/blocked.c b/src/blocked.c
index 045369e93..92f1cee65 100644
--- a/src/blocked.c
+++ b/src/blocked.c
@@ -371,9 +371,10 @@ void serveClientsBlockedOnStreamKey(robj *o, readyList *rl) {
int noack = 0;
if (group) {
- consumer = streamLookupConsumer(group,
- receiver->bpop.xread_consumer->ptr,
- 1);
+ consumer =
+ streamLookupConsumer(group,
+ receiver->bpop.xread_consumer->ptr,
+ SLC_NONE);
noack = receiver->bpop.xread_group_noack;
}
diff --git a/src/rdb.c b/src/rdb.c
index 14c5579e0..7ac2d43c2 100644
--- a/src/rdb.c
+++ b/src/rdb.c
@@ -1851,8 +1851,8 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key) {
decrRefCount(o);
return NULL;
}
- streamConsumer *consumer = streamLookupConsumer(cgroup,cname,
- 1);
+ streamConsumer *consumer =
+ streamLookupConsumer(cgroup,cname,SLC_NONE);
sdsfree(cname);
consumer->seen_time = rdbLoadMillisecondTime(rdb,RDB_VERSION);
if (rioGetReadError(rdb)) {
diff --git a/src/stream.h b/src/stream.h
index b69073994..0d3bf63fc 100644
--- a/src/stream.h
+++ b/src/stream.h
@@ -96,6 +96,11 @@ typedef struct streamPropInfo {
/* Prototypes of exported APIs. */
struct client;
+/* Flags for streamLookupConsumer */
+#define SLC_NONE 0
+#define SLC_NOCREAT (1<<0) /* Do not create the consumer if it doesn't exist */
+#define SLC_NOREFRESH (1<<1) /* Do not update consumer's seen-time */
+
stream *streamNew(void);
void freeStream(stream *s);
unsigned long streamLength(const robj *subject);
@@ -105,7 +110,7 @@ int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields);
void streamIteratorGetField(streamIterator *si, unsigned char **fieldptr, unsigned char **valueptr, int64_t *fieldlen, int64_t *valuelen);
void streamIteratorStop(streamIterator *si);
streamCG *streamLookupCG(stream *s, sds groupname);
-streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int create);
+streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int flags);
streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID *id);
streamNACK *streamCreateNACK(streamConsumer *consumer);
void streamDecodeID(void *buf, streamID *id);
diff --git a/src/t_stream.c b/src/t_stream.c
index 5c1b9a523..676ddd9bb 100644
--- a/src/t_stream.c
+++ b/src/t_stream.c
@@ -1570,7 +1570,8 @@ void xreadCommand(client *c) {
addReplyBulk(c,c->argv[streams_arg+i]);
streamConsumer *consumer = NULL;
if (groups) consumer = streamLookupConsumer(groups[i],
- consumername->ptr,1);
+ consumername->ptr,
+ SLC_NONE);
streamPropInfo spi = {c->argv[i+streams_arg],groupname};
int flags = 0;
if (noack) flags |= STREAM_RWR_NOACK;
@@ -1706,7 +1707,9 @@ streamCG *streamLookupCG(stream *s, sds groupname) {
* consumer does not exist it is automatically created as a side effect
* of calling this function, otherwise its last seen time is updated and
* the existing consumer reference returned. */
-streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int create) {
+streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int flags) {
+ int create = !(flags & SLC_NOCREAT);
+ int refresh = !(flags & SLC_NOREFRESH);
streamConsumer *consumer = raxFind(cg->consumers,(unsigned char*)name,
sdslen(name));
if (consumer == raxNotFound) {
@@ -1717,7 +1720,7 @@ streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int create) {
raxInsert(cg->consumers,(unsigned char*)name,sdslen(name),
consumer,NULL);
}
- consumer->seen_time = mstime();
+ if (refresh) consumer->seen_time = mstime();
return consumer;
}
@@ -1725,7 +1728,8 @@ streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int create) {
* may have pending messages: they are removed from the PEL, and the number
* of pending messages "lost" is returned. */
uint64_t streamDelConsumer(streamCG *cg, sds name) {
- streamConsumer *consumer = streamLookupConsumer(cg,name,0);
+ streamConsumer *consumer =
+ streamLookupConsumer(cg,name,SLC_NOCREAT|SLC_NOREFRESH);
if (consumer == NULL) return 0;
uint64_t retval = raxSize(consumer->pel);
@@ -2068,15 +2072,18 @@ void xpendingCommand(client *c) {
}
/* XPENDING <key> <group> <start> <stop> <count> [<consumer>] variant. */
else {
- streamConsumer *consumer = consumername ?
- streamLookupConsumer(group,consumername->ptr,0):
- NULL;
-
- /* If a consumer name was mentioned but it does not exist, we can
- * just return an empty array. */
- if (consumername && consumer == NULL) {
- addReplyArrayLen(c,0);
- return;
+ streamConsumer *consumer = NULL;
+ if (consumername) {
+ consumer = streamLookupConsumer(group,
+ consumername->ptr,
+ SLC_NOCREAT|SLC_NOREFRESH);
+
+ /* If a consumer name was mentioned but it does not exist, we can
+ * just return an empty array. */
+ if (consumer == NULL) {
+ addReplyArrayLen(c,0);
+ return;
+ }
}
rax *pel = consumer ? consumer->pel : group->pel;
@@ -2338,7 +2345,7 @@ void xclaimCommand(client *c) {
raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL);
/* Update the consumer and idle time. */
if (consumer == NULL)
- consumer = streamLookupConsumer(group,c->argv[3]->ptr,1);
+ consumer = streamLookupConsumer(group,c->argv[3]->ptr,SLC_NONE);
nack->consumer = consumer;
nack->delivery_time = deliverytime;
/* Set the delivery attempts counter if given, otherwise