diff options
author | antirez <antirez@gmail.com> | 2020-04-17 15:46:43 +0200 |
---|---|---|
committer | antirez <antirez@gmail.com> | 2020-04-17 15:51:39 +0200 |
commit | 5aa5e3af99092a5ad34a2d24ce4eb91c3f0c5227 (patch) | |
tree | e148e95ee7975094d9b945bc93cc5ffbdb65f42b /src/t_stream.c | |
parent | c479eace4512193bcfe3dcab3ab238486f6f9405 (diff) | |
download | redis-stream-propagation-fix.tar.gz |
Streams: use alsoPropagate() when in command context.stream-propagation-fix
Related to #7105.
Diffstat (limited to 'src/t_stream.c')
-rw-r--r-- | src/t_stream.c | 63 |
1 files changed, 42 insertions, 21 deletions
diff --git a/src/t_stream.c b/src/t_stream.c index 155167af9..b4f84ed19 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -42,7 +42,7 @@ void streamFreeCG(streamCG *cg); void streamFreeNACK(streamNACK *na); -size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start, streamID *end, size_t count, streamConsumer *consumer); +size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start, streamID *end, size_t count, streamConsumer *consumer, int external); /* ----------------------------------------------------------------------- * Low level stream encoding: a radix tree of listpacks. @@ -824,8 +824,10 @@ robj *createObjectFromStreamID(streamID *id) { /* As a result of an explicit XCLAIM or XREADGROUP command, new entries * are created in the pending list of the stream and consumers. We need - * to propagate this changes in the form of XCLAIM commands. */ -void streamPropagateXCLAIM(client *c, robj *key, streamCG *group, robj *groupname, robj *id, streamNACK *nack) { + * to propagate this changes in the form of XCLAIM commands. + * 'external' should be true if this command is called outside the + * function implementing a command. */ +void streamPropagateXCLAIM(client *c, robj *key, streamCG *group, robj *groupname, robj *id, streamNACK *nack, int external) { /* We need to generate an XCLAIM that will work in a idempotent fashion: * * XCLAIM <key> <group> <consumer> 0 <id> TIME <milliseconds-unix-time> @@ -848,7 +850,12 @@ void streamPropagateXCLAIM(client *c, robj *key, streamCG *group, robj *groupnam argv[11] = createStringObject("JUSTID",6); argv[12] = createStringObject("LASTID",6); argv[13] = createObjectFromStreamID(&group->last_id); - propagate(server.xclaimCommand,c->db->id,argv,14,PROPAGATE_AOF|PROPAGATE_REPL); + if (external) + propagate(server.xclaimCommand,c->db->id,argv,14, + PROPAGATE_AOF|PROPAGATE_REPL); + else + alsoPropagate(server.xclaimCommand,c->db->id,argv,14, + PROPAGATE_AOF|PROPAGATE_REPL); decrRefCount(argv[0]); decrRefCount(argv[3]); decrRefCount(argv[4]); @@ -867,15 +874,22 @@ void streamPropagateXCLAIM(client *c, robj *key, streamCG *group, robj *groupnam * propagate the last ID just using the XCLAIM LASTID option, so we emit * * XGROUP SETID <key> <groupname> <id> - */ -void streamPropagateGroupID(client *c, robj *key, streamCG *group, robj *groupname) { + * + * 'external' should be true if this command is called outside the + * function implementing a command. */ +void streamPropagateGroupID(client *c, robj *key, streamCG *group, robj *groupname, int external) { robj *argv[5]; argv[0] = createStringObject("XGROUP",6); argv[1] = createStringObject("SETID",5); argv[2] = key; argv[3] = groupname; argv[4] = createObjectFromStreamID(&group->last_id); - alsoPropagate(server.xgroupCommand,c->db->id,argv,5,PROPAGATE_AOF|PROPAGATE_REPL); + if (external) + propagate(server.xgroupCommand,c->db->id,argv,5, + PROPAGATE_AOF|PROPAGATE_REPL); + else + alsoPropagate(server.xgroupCommand,c->db->id,argv,5, + PROPAGATE_AOF|PROPAGATE_REPL); decrRefCount(argv[0]); decrRefCount(argv[1]); decrRefCount(argv[4]); @@ -924,12 +938,16 @@ void streamPropagateGroupID(client *c, robj *key, streamCG *group, robj *groupna * streamReplyWithRange() in order to emit single entries (found in the * PEL by ID) to the client. This is the use case for the STREAM_RWR_RAWENTRIES * flag. + * + * Since this command also handles propagation, the last argument 'external' + * should be set to 1 if the function is called outside the context of + * a function implementing a Redis command. */ #define STREAM_RWR_NOACK (1<<0) /* Do not create entries in the PEL. */ #define STREAM_RWR_RAWENTRIES (1<<1) /* Do not emit protocol for array boundaries, just the entries. */ #define STREAM_RWR_HISTORY (1<<2) /* Only serve consumer local PEL. */ -size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count, int rev, streamCG *group, streamConsumer *consumer, int flags, streamPropInfo *spi) { +size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count, int rev, streamCG *group, streamConsumer *consumer, int flags, streamPropInfo *spi, int external) { void *arraylen_ptr = NULL; size_t arraylen = 0; streamIterator si; @@ -943,7 +961,7 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end * as delivered. */ if (group && (flags & STREAM_RWR_HISTORY)) { return streamReplyWithRangeFromConsumerPEL(c,s,start,end,count, - consumer); + consumer,external); } if (!(flags & STREAM_RWR_RAWENTRIES)) @@ -1017,12 +1035,12 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end /* Propagate as XCLAIM. */ if (spi) { robj *idarg = createObjectFromStreamID(&id); - streamPropagateXCLAIM(c,spi->keyname,group,spi->groupname,idarg,nack); + streamPropagateXCLAIM(c,spi->keyname,group,spi->groupname,idarg,nack,external); decrRefCount(idarg); } } else { if (propagate_last_id) - streamPropagateGroupID(c,spi->keyname,group,spi->groupname); + streamPropagateGroupID(c,spi->keyname,group,spi->groupname,external); } arraylen++; @@ -1045,8 +1063,11 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end * This function is more expensive because it needs to inspect the PEL and then * seek into the radix tree of the messages in order to emit the full message * to the client. However clients only reach this code path when they are - * fetching the history of already retrieved messages, which is rare. */ -size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start, streamID *end, size_t count, streamConsumer *consumer) { + * fetching the history of already retrieved messages, which is rare. + * + * See streamReplyWithRange() for the meaning of the last argument + * 'external'. This function is only called from streamReplyWithRange(). */ +size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start, streamID *end, size_t count, streamConsumer *consumer, int external) { raxIterator ri; unsigned char startkey[sizeof(streamID)]; unsigned char endkey[sizeof(streamID)]; @@ -1062,7 +1083,7 @@ size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start streamID thisid; streamDecodeID(ri.key,&thisid); if (streamReplyWithRange(c,s,&thisid,&thisid,1,0,NULL,NULL, - STREAM_RWR_RAWENTRIES,NULL) == 0) + STREAM_RWR_RAWENTRIES,NULL,external) == 0) { /* Note that we may have a not acknowledged entry in the PEL * about a message that's no longer here because was removed @@ -1324,7 +1345,7 @@ void xrangeGenericCommand(client *c, int rev) { addReplyNullArray(c); } else { if (count == -1) count = 0; - streamReplyWithRange(c,s,&startid,&endid,count,rev,NULL,NULL,0,NULL); + streamReplyWithRange(c,s,&startid,&endid,count,rev,NULL,NULL,0,NULL,0); } } @@ -1563,7 +1584,7 @@ void xreadCommand(client *c) { if (serve_history) flags |= STREAM_RWR_HISTORY; streamReplyWithRange(c,s,&start,NULL,count,0, groups ? groups[i] : NULL, - consumer, flags, &spi); + consumer, flags, &spi, 0); if (groups) server.dirty++; } } @@ -2341,19 +2362,19 @@ void xclaimCommand(client *c) { addReplyStreamID(c,&id); } else { size_t emitted = streamReplyWithRange(c,o->ptr,&id,&id,1,0, - NULL,NULL,STREAM_RWR_RAWENTRIES,NULL); + NULL,NULL,STREAM_RWR_RAWENTRIES,NULL,0); if (!emitted) addReplyNull(c); } arraylen++; /* Propagate this change. */ - streamPropagateXCLAIM(c,c->argv[1],group,c->argv[2],c->argv[j],nack); + streamPropagateXCLAIM(c,c->argv[1],group,c->argv[2],c->argv[j],nack,0); propagate_last_id = 0; /* Will be propagated by XCLAIM itself. */ server.dirty++; } } if (propagate_last_id) { - streamPropagateGroupID(c,c->argv[1],group,c->argv[2]); + streamPropagateGroupID(c,c->argv[1],group,c->argv[2],0); server.dirty++; } setDeferredArrayLen(c,arraylenptr,arraylen); @@ -2586,11 +2607,11 @@ NULL end.ms = end.seq = UINT64_MAX; addReplyBulkCString(c,"first-entry"); count = streamReplyWithRange(c,s,&start,&end,1,0,NULL,NULL, - STREAM_RWR_RAWENTRIES,NULL); + STREAM_RWR_RAWENTRIES,NULL,0); if (!count) addReplyNull(c); addReplyBulkCString(c,"last-entry"); count = streamReplyWithRange(c,s,&start,&end,1,1,NULL,NULL, - STREAM_RWR_RAWENTRIES,NULL); + STREAM_RWR_RAWENTRIES,NULL,0); if (!count) addReplyNull(c); } else { addReplySubcommandSyntaxError(c); |