summaryrefslogtreecommitdiff
path: root/src/t_stream.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/t_stream.c')
-rw-r--r--src/t_stream.c63
1 files changed, 42 insertions, 21 deletions
diff --git a/src/t_stream.c b/src/t_stream.c
index 155167af9..b4f84ed19 100644
--- a/src/t_stream.c
+++ b/src/t_stream.c
@@ -42,7 +42,7 @@
void streamFreeCG(streamCG *cg);
void streamFreeNACK(streamNACK *na);
-size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start, streamID *end, size_t count, streamConsumer *consumer);
+size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start, streamID *end, size_t count, streamConsumer *consumer, int external);
/* -----------------------------------------------------------------------
* Low level stream encoding: a radix tree of listpacks.
@@ -824,8 +824,10 @@ robj *createObjectFromStreamID(streamID *id) {
/* As a result of an explicit XCLAIM or XREADGROUP command, new entries
* are created in the pending list of the stream and consumers. We need
- * to propagate this changes in the form of XCLAIM commands. */
-void streamPropagateXCLAIM(client *c, robj *key, streamCG *group, robj *groupname, robj *id, streamNACK *nack) {
+ * to propagate this changes in the form of XCLAIM commands.
+ * 'external' should be true if this command is called outside the
+ * function implementing a command. */
+void streamPropagateXCLAIM(client *c, robj *key, streamCG *group, robj *groupname, robj *id, streamNACK *nack, int external) {
/* We need to generate an XCLAIM that will work in a idempotent fashion:
*
* XCLAIM <key> <group> <consumer> 0 <id> TIME <milliseconds-unix-time>
@@ -848,7 +850,12 @@ void streamPropagateXCLAIM(client *c, robj *key, streamCG *group, robj *groupnam
argv[11] = createStringObject("JUSTID",6);
argv[12] = createStringObject("LASTID",6);
argv[13] = createObjectFromStreamID(&group->last_id);
- propagate(server.xclaimCommand,c->db->id,argv,14,PROPAGATE_AOF|PROPAGATE_REPL);
+ if (external)
+ propagate(server.xclaimCommand,c->db->id,argv,14,
+ PROPAGATE_AOF|PROPAGATE_REPL);
+ else
+ alsoPropagate(server.xclaimCommand,c->db->id,argv,14,
+ PROPAGATE_AOF|PROPAGATE_REPL);
decrRefCount(argv[0]);
decrRefCount(argv[3]);
decrRefCount(argv[4]);
@@ -867,15 +874,22 @@ void streamPropagateXCLAIM(client *c, robj *key, streamCG *group, robj *groupnam
* propagate the last ID just using the XCLAIM LASTID option, so we emit
*
* XGROUP SETID <key> <groupname> <id>
- */
-void streamPropagateGroupID(client *c, robj *key, streamCG *group, robj *groupname) {
+ *
+ * 'external' should be true if this command is called outside the
+ * function implementing a command. */
+void streamPropagateGroupID(client *c, robj *key, streamCG *group, robj *groupname, int external) {
robj *argv[5];
argv[0] = createStringObject("XGROUP",6);
argv[1] = createStringObject("SETID",5);
argv[2] = key;
argv[3] = groupname;
argv[4] = createObjectFromStreamID(&group->last_id);
- alsoPropagate(server.xgroupCommand,c->db->id,argv,5,PROPAGATE_AOF|PROPAGATE_REPL);
+ if (external)
+ propagate(server.xgroupCommand,c->db->id,argv,5,
+ PROPAGATE_AOF|PROPAGATE_REPL);
+ else
+ alsoPropagate(server.xgroupCommand,c->db->id,argv,5,
+ PROPAGATE_AOF|PROPAGATE_REPL);
decrRefCount(argv[0]);
decrRefCount(argv[1]);
decrRefCount(argv[4]);
@@ -924,12 +938,16 @@ void streamPropagateGroupID(client *c, robj *key, streamCG *group, robj *groupna
* streamReplyWithRange() in order to emit single entries (found in the
* PEL by ID) to the client. This is the use case for the STREAM_RWR_RAWENTRIES
* flag.
+ *
+ * Since this command also handles propagation, the last argument 'external'
+ * should be set to 1 if the function is called outside the context of
+ * a function implementing a Redis command.
*/
#define STREAM_RWR_NOACK (1<<0) /* Do not create entries in the PEL. */
#define STREAM_RWR_RAWENTRIES (1<<1) /* Do not emit protocol for array
boundaries, just the entries. */
#define STREAM_RWR_HISTORY (1<<2) /* Only serve consumer local PEL. */
-size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count, int rev, streamCG *group, streamConsumer *consumer, int flags, streamPropInfo *spi) {
+size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count, int rev, streamCG *group, streamConsumer *consumer, int flags, streamPropInfo *spi, int external) {
void *arraylen_ptr = NULL;
size_t arraylen = 0;
streamIterator si;
@@ -943,7 +961,7 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end
* as delivered. */
if (group && (flags & STREAM_RWR_HISTORY)) {
return streamReplyWithRangeFromConsumerPEL(c,s,start,end,count,
- consumer);
+ consumer,external);
}
if (!(flags & STREAM_RWR_RAWENTRIES))
@@ -1017,12 +1035,12 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end
/* Propagate as XCLAIM. */
if (spi) {
robj *idarg = createObjectFromStreamID(&id);
- streamPropagateXCLAIM(c,spi->keyname,group,spi->groupname,idarg,nack);
+ streamPropagateXCLAIM(c,spi->keyname,group,spi->groupname,idarg,nack,external);
decrRefCount(idarg);
}
} else {
if (propagate_last_id)
- streamPropagateGroupID(c,spi->keyname,group,spi->groupname);
+ streamPropagateGroupID(c,spi->keyname,group,spi->groupname,external);
}
arraylen++;
@@ -1045,8 +1063,11 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end
* This function is more expensive because it needs to inspect the PEL and then
* seek into the radix tree of the messages in order to emit the full message
* to the client. However clients only reach this code path when they are
- * fetching the history of already retrieved messages, which is rare. */
-size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start, streamID *end, size_t count, streamConsumer *consumer) {
+ * fetching the history of already retrieved messages, which is rare.
+ *
+ * See streamReplyWithRange() for the meaning of the last argument
+ * 'external'. This function is only called from streamReplyWithRange(). */
+size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start, streamID *end, size_t count, streamConsumer *consumer, int external) {
raxIterator ri;
unsigned char startkey[sizeof(streamID)];
unsigned char endkey[sizeof(streamID)];
@@ -1062,7 +1083,7 @@ size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start
streamID thisid;
streamDecodeID(ri.key,&thisid);
if (streamReplyWithRange(c,s,&thisid,&thisid,1,0,NULL,NULL,
- STREAM_RWR_RAWENTRIES,NULL) == 0)
+ STREAM_RWR_RAWENTRIES,NULL,external) == 0)
{
/* Note that we may have a not acknowledged entry in the PEL
* about a message that's no longer here because was removed
@@ -1324,7 +1345,7 @@ void xrangeGenericCommand(client *c, int rev) {
addReplyNullArray(c);
} else {
if (count == -1) count = 0;
- streamReplyWithRange(c,s,&startid,&endid,count,rev,NULL,NULL,0,NULL);
+ streamReplyWithRange(c,s,&startid,&endid,count,rev,NULL,NULL,0,NULL,0);
}
}
@@ -1563,7 +1584,7 @@ void xreadCommand(client *c) {
if (serve_history) flags |= STREAM_RWR_HISTORY;
streamReplyWithRange(c,s,&start,NULL,count,0,
groups ? groups[i] : NULL,
- consumer, flags, &spi);
+ consumer, flags, &spi, 0);
if (groups) server.dirty++;
}
}
@@ -2341,19 +2362,19 @@ void xclaimCommand(client *c) {
addReplyStreamID(c,&id);
} else {
size_t emitted = streamReplyWithRange(c,o->ptr,&id,&id,1,0,
- NULL,NULL,STREAM_RWR_RAWENTRIES,NULL);
+ NULL,NULL,STREAM_RWR_RAWENTRIES,NULL,0);
if (!emitted) addReplyNull(c);
}
arraylen++;
/* Propagate this change. */
- streamPropagateXCLAIM(c,c->argv[1],group,c->argv[2],c->argv[j],nack);
+ streamPropagateXCLAIM(c,c->argv[1],group,c->argv[2],c->argv[j],nack,0);
propagate_last_id = 0; /* Will be propagated by XCLAIM itself. */
server.dirty++;
}
}
if (propagate_last_id) {
- streamPropagateGroupID(c,c->argv[1],group,c->argv[2]);
+ streamPropagateGroupID(c,c->argv[1],group,c->argv[2],0);
server.dirty++;
}
setDeferredArrayLen(c,arraylenptr,arraylen);
@@ -2586,11 +2607,11 @@ NULL
end.ms = end.seq = UINT64_MAX;
addReplyBulkCString(c,"first-entry");
count = streamReplyWithRange(c,s,&start,&end,1,0,NULL,NULL,
- STREAM_RWR_RAWENTRIES,NULL);
+ STREAM_RWR_RAWENTRIES,NULL,0);
if (!count) addReplyNull(c);
addReplyBulkCString(c,"last-entry");
count = streamReplyWithRange(c,s,&start,&end,1,1,NULL,NULL,
- STREAM_RWR_RAWENTRIES,NULL);
+ STREAM_RWR_RAWENTRIES,NULL,0);
if (!count) addReplyNull(c);
} else {
addReplySubcommandSyntaxError(c);