summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorantirez <antirez@gmail.com>2018-12-03 16:24:04 +0100
committerantirez <antirez@gmail.com>2018-12-21 11:42:51 +0100
commitd082212051d77039c20d789248c383164d79bc85 (patch)
tree695572a2a424cebc1c0934c9e154c2b962f40b2d
parentaa862c30e4bdaf727f665c5096fbdea7e33afe6a (diff)
downloadredis-d082212051d77039c20d789248c383164d79bc85.tar.gz
RESP3: t_stream.c updated.
-rw-r--r--src/blocked.c8
-rw-r--r--src/t_stream.c81
2 files changed, 49 insertions, 40 deletions
diff --git a/src/blocked.c b/src/blocked.c
index b8cf02c82..f9e196626 100644
--- a/src/blocked.c
+++ b/src/blocked.c
@@ -436,8 +436,12 @@ void handleClientsBlockedOnKeys(void) {
* the name of the stream and the data we
* extracted from it. Wrapped in a single-item
* array, since we have just one key. */
- addReplyArrayLen(receiver,1);
- addReplyArrayLen(receiver,2);
+ if (receiver->resp == 2) {
+ addReplyArrayLen(receiver,1);
+ addReplyArrayLen(receiver,2);
+ } else {
+ addReplyMapLen(receiver,1);
+ }
addReplyBulk(receiver,rl->key);
streamPropInfo pi = {
diff --git a/src/t_stream.c b/src/t_stream.c
index f51f6c46b..1a5acac42 100644
--- a/src/t_stream.c
+++ b/src/t_stream.c
@@ -914,7 +914,7 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end
}
if (!(flags & STREAM_RWR_RAWENTRIES))
- arraylen_ptr = addDeferredMultiBulkLength(c);
+ arraylen_ptr = addReplyDeferredLen(c);
streamIteratorStart(&si,s,start,end,rev);
while(streamIteratorGetID(&si,&id,&numfields)) {
/* Update the group last_id if needed. */
@@ -925,9 +925,10 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end
/* Emit a two elements array for each item. The first is
* the ID, the second is an array of field-value pairs. */
- addReplyMultiBulkLen(c,2);
+ addReplyArrayLen(c,2);
addReplyStreamID(c,&id);
- addReplyMultiBulkLen(c,numfields*2);
+
+ addReplyMapLen(c,numfields);
/* Emit the field-value pairs. */
while(numfields--) {
@@ -993,7 +994,7 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end
if (count && count == arraylen) break;
}
streamIteratorStop(&si);
- if (arraylen_ptr) setDeferredMultiBulkLength(c,arraylen_ptr,arraylen);
+ if (arraylen_ptr) setDeferredArrayLen(c,arraylen_ptr,arraylen);
return arraylen;
}
@@ -1018,7 +1019,7 @@ size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start
if (end) streamEncodeID(endkey,end);
size_t arraylen = 0;
- void *arraylen_ptr = addDeferredMultiBulkLength(c);
+ void *arraylen_ptr = addReplyDeferredLen(c);
raxStart(&ri,consumer->pel);
raxSeek(&ri,">=",startkey,sizeof(startkey));
while(raxNext(&ri) && (!count || arraylen < count)) {
@@ -1032,11 +1033,11 @@ size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start
* about a message that's no longer here because was removed
* by the user by other means. In that case we signal it emitting
* the ID but then a NULL entry for the fields. */
- addReplyMultiBulkLen(c,2);
+ addReplyArrayLen(c,2);
streamID id;
streamDecodeID(ri.key,&id);
addReplyStreamID(c,&id);
- addReply(c,shared.nullmultibulk);
+ addReplyNullArray(c);
} else {
streamNACK *nack = ri.data;
nack->delivery_time = mstime();
@@ -1045,7 +1046,7 @@ size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start
arraylen++;
}
raxStop(&ri);
- setDeferredMultiBulkLength(c,arraylen_ptr,arraylen);
+ setDeferredArrayLen(c,arraylen_ptr,arraylen);
return arraylen;
}
@@ -1286,12 +1287,13 @@ void xrangeGenericCommand(client *c, int rev) {
}
/* Return the specified range to the user. */
- if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptymultibulk)) == NULL
- || checkType(c,o,OBJ_STREAM)) return;
+ if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptyarray)) == NULL ||
+ checkType(c,o,OBJ_STREAM)) return;
+
s = o->ptr;
if (count == 0) {
- addReply(c,shared.nullmultibulk);
+ addReplyNullArray(c);
} else {
if (count == -1) count = 0;
streamReplyWithRange(c,s,&startid,&endid,count,rev,NULL,NULL,0,NULL);
@@ -1505,7 +1507,7 @@ void xreadCommand(client *c) {
if (serve_synchronously) {
arraylen++;
- if (arraylen == 1) arraylen_ptr = addDeferredMultiBulkLength(c);
+ if (arraylen == 1) arraylen_ptr = addReplyDeferredLen(c);
/* streamReplyWithRange() handles the 'start' ID as inclusive,
* so start from the next ID, since we want only messages with
* IDs greater than start. */
@@ -1514,7 +1516,7 @@ void xreadCommand(client *c) {
/* Emit the two elements sub-array consisting of the name
* of the stream and the data we extracted from it. */
- addReplyMultiBulkLen(c,2);
+ if (c->resp == 2) addReplyArrayLen(c,2);
addReplyBulk(c,c->argv[streams_arg+i]);
streamConsumer *consumer = NULL;
if (groups) consumer = streamLookupConsumer(groups[i],
@@ -1532,7 +1534,10 @@ void xreadCommand(client *c) {
/* We replied synchronously! Set the top array len and return to caller. */
if (arraylen) {
- setDeferredMultiBulkLength(c,arraylen_ptr,arraylen);
+ if (c->resp == 2)
+ setDeferredArrayLen(c,arraylen_ptr,arraylen);
+ else
+ setDeferredMapLen(c,arraylen_ptr,arraylen);
goto cleanup;
}
@@ -1541,7 +1546,7 @@ void xreadCommand(client *c) {
/* If we are inside a MULTI/EXEC and the list is empty the only thing
* we can do is treating it as a timeout (even with timeout 0). */
if (c->flags & CLIENT_MULTI) {
- addReply(c,shared.nullmultibulk);
+ addReplyNullArray(c);
goto cleanup;
}
blockForKeys(c, BLOCKED_STREAM, c->argv+streams_arg, streams_count,
@@ -1570,7 +1575,7 @@ void xreadCommand(client *c) {
/* No BLOCK option, nor any stream we can serve. Reply as with a
* timeout happened. */
- addReply(c,shared.nullmultibulk);
+ addReplyNullArray(c);
/* Continue to cleanup... */
cleanup: /* Cleanup. */
@@ -1960,14 +1965,14 @@ void xpendingCommand(client *c) {
/* XPENDING <key> <group> variant. */
if (justinfo) {
- addReplyMultiBulkLen(c,4);
+ addReplyArrayLen(c,4);
/* Total number of messages in the PEL. */
addReplyLongLong(c,raxSize(group->pel));
/* First and last IDs. */
if (raxSize(group->pel) == 0) {
- addReply(c,shared.nullbulk); /* Start. */
- addReply(c,shared.nullbulk); /* End. */
- addReply(c,shared.nullmultibulk); /* Clients. */
+ addReplyNull(c); /* Start. */
+ addReplyNull(c); /* End. */
+ addReplyNullArray(c); /* Clients. */
} else {
/* Start. */
raxIterator ri;
@@ -1987,17 +1992,17 @@ void xpendingCommand(client *c) {
/* Consumers with pending messages. */
raxStart(&ri,group->consumers);
raxSeek(&ri,"^",NULL,0);
- void *arraylen_ptr = addDeferredMultiBulkLength(c);
+ void *arraylen_ptr = addReplyDeferredLen(c);
size_t arraylen = 0;
while(raxNext(&ri)) {
streamConsumer *consumer = ri.data;
if (raxSize(consumer->pel) == 0) continue;
- addReplyMultiBulkLen(c,2);
+ addReplyArrayLen(c,2);
addReplyBulkCBuffer(c,ri.key,ri.key_len);
addReplyBulkLongLong(c,raxSize(consumer->pel));
arraylen++;
}
- setDeferredMultiBulkLength(c,arraylen_ptr,arraylen);
+ setDeferredArrayLen(c,arraylen_ptr,arraylen);
raxStop(&ri);
}
}
@@ -2010,7 +2015,7 @@ void xpendingCommand(client *c) {
/* If a consumer name was mentioned but it does not exist, we can
* just return an empty array. */
if (consumername && consumer == NULL) {
- addReplyMultiBulkLen(c,0);
+ addReplyArrayLen(c,0);
return;
}
@@ -2024,7 +2029,7 @@ void xpendingCommand(client *c) {
streamEncodeID(endkey,&endid);
raxStart(&ri,pel);
raxSeek(&ri,">=",startkey,sizeof(startkey));
- void *arraylen_ptr = addDeferredMultiBulkLength(c);
+ void *arraylen_ptr = addReplyDeferredLen(c);
size_t arraylen = 0;
while(count && raxNext(&ri) && memcmp(ri.key,endkey,ri.key_len) <= 0) {
@@ -2032,7 +2037,7 @@ void xpendingCommand(client *c) {
arraylen++;
count--;
- addReplyMultiBulkLen(c,4);
+ addReplyArrayLen(c,4);
/* Entry ID. */
streamID id;
@@ -2052,7 +2057,7 @@ void xpendingCommand(client *c) {
addReplyLongLong(c,nack->delivery_count);
}
raxStop(&ri);
- setDeferredMultiBulkLength(c,arraylen_ptr,arraylen);
+ setDeferredArrayLen(c,arraylen_ptr,arraylen);
}
}
@@ -2221,7 +2226,7 @@ void xclaimCommand(client *c) {
/* Do the actual claiming. */
streamConsumer *consumer = streamLookupConsumer(group,c->argv[3]->ptr,1);
- void *arraylenptr = addDeferredMultiBulkLength(c);
+ void *arraylenptr = addReplyDeferredLen(c);
size_t arraylen = 0;
for (int j = 5; j <= last_id_arg; j++) {
streamID id;
@@ -2284,7 +2289,7 @@ void xclaimCommand(client *c) {
} else {
size_t emitted = streamReplyWithRange(c,o->ptr,&id,&id,1,0,
NULL,NULL,STREAM_RWR_RAWENTRIES,NULL);
- if (!emitted) addReply(c,shared.nullbulk);
+ if (!emitted) addReplyNull(c);
}
arraylen++;
@@ -2298,7 +2303,7 @@ void xclaimCommand(client *c) {
streamPropagateGroupID(c,c->argv[1],group,c->argv[2]);
server.dirty++;
}
- setDeferredMultiBulkLength(c,arraylenptr,arraylen);
+ setDeferredArrayLen(c,arraylenptr,arraylen);
preventCommandPropagation(c);
}
@@ -2463,7 +2468,7 @@ NULL
return;
}
- addReplyMultiBulkLen(c,raxSize(cg->consumers));
+ addReplyArrayLen(c,raxSize(cg->consumers));
raxIterator ri;
raxStart(&ri,cg->consumers);
raxSeek(&ri,"^",NULL,0);
@@ -2473,7 +2478,7 @@ NULL
mstime_t idle = now - consumer->seen_time;
if (idle < 0) idle = 0;
- addReplyMultiBulkLen(c,6);
+ addReplyMapLen(c,3);
addReplyBulkCString(c,"name");
addReplyBulkCBuffer(c,consumer->name,sdslen(consumer->name));
addReplyBulkCString(c,"pending");
@@ -2485,17 +2490,17 @@ NULL
} else if (!strcasecmp(opt,"GROUPS") && c->argc == 3) {
/* XINFO GROUPS <key>. */
if (s->cgroups == NULL) {
- addReplyMultiBulkLen(c,0);
+ addReplyArrayLen(c,0);
return;
}
- addReplyMultiBulkLen(c,raxSize(s->cgroups));
+ addReplyArrayLen(c,raxSize(s->cgroups));
raxIterator ri;
raxStart(&ri,s->cgroups);
raxSeek(&ri,"^",NULL,0);
while(raxNext(&ri)) {
streamCG *cg = ri.data;
- addReplyMultiBulkLen(c,8);
+ addReplyMapLen(c,4);
addReplyBulkCString(c,"name");
addReplyBulkCBuffer(c,ri.key,ri.key_len);
addReplyBulkCString(c,"consumers");
@@ -2508,7 +2513,7 @@ NULL
raxStop(&ri);
} else if (!strcasecmp(opt,"STREAM") && c->argc == 3) {
/* XINFO STREAM <key> (or the alias XINFO <key>). */
- addReplyMultiBulkLen(c,14);
+ addReplyMapLen(c,7);
addReplyBulkCString(c,"length");
addReplyLongLong(c,s->length);
addReplyBulkCString(c,"radix-tree-keys");
@@ -2529,11 +2534,11 @@ NULL
addReplyBulkCString(c,"first-entry");
count = streamReplyWithRange(c,s,&start,&end,1,0,NULL,NULL,
STREAM_RWR_RAWENTRIES,NULL);
- if (!count) addReply(c,shared.nullbulk);
+ if (!count) addReplyNull(c);
addReplyBulkCString(c,"last-entry");
count = streamReplyWithRange(c,s,&start,&end,1,1,NULL,NULL,
STREAM_RWR_RAWENTRIES,NULL);
- if (!count) addReply(c,shared.nullbulk);
+ if (!count) addReplyNull(c);
} else {
addReplySubcommandSyntaxError(c);
}