From d082212051d77039c20d789248c383164d79bc85 Mon Sep 17 00:00:00 2001 From: antirez Date: Mon, 3 Dec 2018 16:24:04 +0100 Subject: RESP3: t_stream.c updated. --- src/blocked.c | 8 ++++-- src/t_stream.c | 81 +++++++++++++++++++++++++++++++--------------------------- 2 files changed, 49 insertions(+), 40 deletions(-) diff --git a/src/blocked.c b/src/blocked.c index b8cf02c82..f9e196626 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -436,8 +436,12 @@ void handleClientsBlockedOnKeys(void) { * the name of the stream and the data we * extracted from it. Wrapped in a single-item * array, since we have just one key. */ - addReplyArrayLen(receiver,1); - addReplyArrayLen(receiver,2); + if (receiver->resp == 2) { + addReplyArrayLen(receiver,1); + addReplyArrayLen(receiver,2); + } else { + addReplyMapLen(receiver,1); + } addReplyBulk(receiver,rl->key); streamPropInfo pi = { diff --git a/src/t_stream.c b/src/t_stream.c index f51f6c46b..1a5acac42 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -914,7 +914,7 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end } if (!(flags & STREAM_RWR_RAWENTRIES)) - arraylen_ptr = addDeferredMultiBulkLength(c); + arraylen_ptr = addReplyDeferredLen(c); streamIteratorStart(&si,s,start,end,rev); while(streamIteratorGetID(&si,&id,&numfields)) { /* Update the group last_id if needed. */ @@ -925,9 +925,10 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end /* Emit a two elements array for each item. The first is * the ID, the second is an array of field-value pairs. */ - addReplyMultiBulkLen(c,2); + addReplyArrayLen(c,2); addReplyStreamID(c,&id); - addReplyMultiBulkLen(c,numfields*2); + + addReplyMapLen(c,numfields); /* Emit the field-value pairs. */ while(numfields--) { @@ -993,7 +994,7 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end if (count && count == arraylen) break; } streamIteratorStop(&si); - if (arraylen_ptr) setDeferredMultiBulkLength(c,arraylen_ptr,arraylen); + if (arraylen_ptr) setDeferredArrayLen(c,arraylen_ptr,arraylen); return arraylen; } @@ -1018,7 +1019,7 @@ size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start if (end) streamEncodeID(endkey,end); size_t arraylen = 0; - void *arraylen_ptr = addDeferredMultiBulkLength(c); + void *arraylen_ptr = addReplyDeferredLen(c); raxStart(&ri,consumer->pel); raxSeek(&ri,">=",startkey,sizeof(startkey)); while(raxNext(&ri) && (!count || arraylen < count)) { @@ -1032,11 +1033,11 @@ size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start * about a message that's no longer here because was removed * by the user by other means. In that case we signal it emitting * the ID but then a NULL entry for the fields. */ - addReplyMultiBulkLen(c,2); + addReplyArrayLen(c,2); streamID id; streamDecodeID(ri.key,&id); addReplyStreamID(c,&id); - addReply(c,shared.nullmultibulk); + addReplyNullArray(c); } else { streamNACK *nack = ri.data; nack->delivery_time = mstime(); @@ -1045,7 +1046,7 @@ size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start arraylen++; } raxStop(&ri); - setDeferredMultiBulkLength(c,arraylen_ptr,arraylen); + setDeferredArrayLen(c,arraylen_ptr,arraylen); return arraylen; } @@ -1286,12 +1287,13 @@ void xrangeGenericCommand(client *c, int rev) { } /* Return the specified range to the user. */ - if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptymultibulk)) == NULL - || checkType(c,o,OBJ_STREAM)) return; + if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptyarray)) == NULL || + checkType(c,o,OBJ_STREAM)) return; + s = o->ptr; if (count == 0) { - addReply(c,shared.nullmultibulk); + addReplyNullArray(c); } else { if (count == -1) count = 0; streamReplyWithRange(c,s,&startid,&endid,count,rev,NULL,NULL,0,NULL); @@ -1505,7 +1507,7 @@ void xreadCommand(client *c) { if (serve_synchronously) { arraylen++; - if (arraylen == 1) arraylen_ptr = addDeferredMultiBulkLength(c); + if (arraylen == 1) arraylen_ptr = addReplyDeferredLen(c); /* streamReplyWithRange() handles the 'start' ID as inclusive, * so start from the next ID, since we want only messages with * IDs greater than start. */ @@ -1514,7 +1516,7 @@ void xreadCommand(client *c) { /* Emit the two elements sub-array consisting of the name * of the stream and the data we extracted from it. */ - addReplyMultiBulkLen(c,2); + if (c->resp == 2) addReplyArrayLen(c,2); addReplyBulk(c,c->argv[streams_arg+i]); streamConsumer *consumer = NULL; if (groups) consumer = streamLookupConsumer(groups[i], @@ -1532,7 +1534,10 @@ void xreadCommand(client *c) { /* We replied synchronously! Set the top array len and return to caller. */ if (arraylen) { - setDeferredMultiBulkLength(c,arraylen_ptr,arraylen); + if (c->resp == 2) + setDeferredArrayLen(c,arraylen_ptr,arraylen); + else + setDeferredMapLen(c,arraylen_ptr,arraylen); goto cleanup; } @@ -1541,7 +1546,7 @@ void xreadCommand(client *c) { /* If we are inside a MULTI/EXEC and the list is empty the only thing * we can do is treating it as a timeout (even with timeout 0). */ if (c->flags & CLIENT_MULTI) { - addReply(c,shared.nullmultibulk); + addReplyNullArray(c); goto cleanup; } blockForKeys(c, BLOCKED_STREAM, c->argv+streams_arg, streams_count, @@ -1570,7 +1575,7 @@ void xreadCommand(client *c) { /* No BLOCK option, nor any stream we can serve. Reply as with a * timeout happened. */ - addReply(c,shared.nullmultibulk); + addReplyNullArray(c); /* Continue to cleanup... */ cleanup: /* Cleanup. */ @@ -1960,14 +1965,14 @@ void xpendingCommand(client *c) { /* XPENDING variant. */ if (justinfo) { - addReplyMultiBulkLen(c,4); + addReplyArrayLen(c,4); /* Total number of messages in the PEL. */ addReplyLongLong(c,raxSize(group->pel)); /* First and last IDs. */ if (raxSize(group->pel) == 0) { - addReply(c,shared.nullbulk); /* Start. */ - addReply(c,shared.nullbulk); /* End. */ - addReply(c,shared.nullmultibulk); /* Clients. */ + addReplyNull(c); /* Start. */ + addReplyNull(c); /* End. */ + addReplyNullArray(c); /* Clients. */ } else { /* Start. */ raxIterator ri; @@ -1987,17 +1992,17 @@ void xpendingCommand(client *c) { /* Consumers with pending messages. */ raxStart(&ri,group->consumers); raxSeek(&ri,"^",NULL,0); - void *arraylen_ptr = addDeferredMultiBulkLength(c); + void *arraylen_ptr = addReplyDeferredLen(c); size_t arraylen = 0; while(raxNext(&ri)) { streamConsumer *consumer = ri.data; if (raxSize(consumer->pel) == 0) continue; - addReplyMultiBulkLen(c,2); + addReplyArrayLen(c,2); addReplyBulkCBuffer(c,ri.key,ri.key_len); addReplyBulkLongLong(c,raxSize(consumer->pel)); arraylen++; } - setDeferredMultiBulkLength(c,arraylen_ptr,arraylen); + setDeferredArrayLen(c,arraylen_ptr,arraylen); raxStop(&ri); } } @@ -2010,7 +2015,7 @@ void xpendingCommand(client *c) { /* If a consumer name was mentioned but it does not exist, we can * just return an empty array. */ if (consumername && consumer == NULL) { - addReplyMultiBulkLen(c,0); + addReplyArrayLen(c,0); return; } @@ -2024,7 +2029,7 @@ void xpendingCommand(client *c) { streamEncodeID(endkey,&endid); raxStart(&ri,pel); raxSeek(&ri,">=",startkey,sizeof(startkey)); - void *arraylen_ptr = addDeferredMultiBulkLength(c); + void *arraylen_ptr = addReplyDeferredLen(c); size_t arraylen = 0; while(count && raxNext(&ri) && memcmp(ri.key,endkey,ri.key_len) <= 0) { @@ -2032,7 +2037,7 @@ void xpendingCommand(client *c) { arraylen++; count--; - addReplyMultiBulkLen(c,4); + addReplyArrayLen(c,4); /* Entry ID. */ streamID id; @@ -2052,7 +2057,7 @@ void xpendingCommand(client *c) { addReplyLongLong(c,nack->delivery_count); } raxStop(&ri); - setDeferredMultiBulkLength(c,arraylen_ptr,arraylen); + setDeferredArrayLen(c,arraylen_ptr,arraylen); } } @@ -2221,7 +2226,7 @@ void xclaimCommand(client *c) { /* Do the actual claiming. */ streamConsumer *consumer = streamLookupConsumer(group,c->argv[3]->ptr,1); - void *arraylenptr = addDeferredMultiBulkLength(c); + void *arraylenptr = addReplyDeferredLen(c); size_t arraylen = 0; for (int j = 5; j <= last_id_arg; j++) { streamID id; @@ -2284,7 +2289,7 @@ void xclaimCommand(client *c) { } else { size_t emitted = streamReplyWithRange(c,o->ptr,&id,&id,1,0, NULL,NULL,STREAM_RWR_RAWENTRIES,NULL); - if (!emitted) addReply(c,shared.nullbulk); + if (!emitted) addReplyNull(c); } arraylen++; @@ -2298,7 +2303,7 @@ void xclaimCommand(client *c) { streamPropagateGroupID(c,c->argv[1],group,c->argv[2]); server.dirty++; } - setDeferredMultiBulkLength(c,arraylenptr,arraylen); + setDeferredArrayLen(c,arraylenptr,arraylen); preventCommandPropagation(c); } @@ -2463,7 +2468,7 @@ NULL return; } - addReplyMultiBulkLen(c,raxSize(cg->consumers)); + addReplyArrayLen(c,raxSize(cg->consumers)); raxIterator ri; raxStart(&ri,cg->consumers); raxSeek(&ri,"^",NULL,0); @@ -2473,7 +2478,7 @@ NULL mstime_t idle = now - consumer->seen_time; if (idle < 0) idle = 0; - addReplyMultiBulkLen(c,6); + addReplyMapLen(c,3); addReplyBulkCString(c,"name"); addReplyBulkCBuffer(c,consumer->name,sdslen(consumer->name)); addReplyBulkCString(c,"pending"); @@ -2485,17 +2490,17 @@ NULL } else if (!strcasecmp(opt,"GROUPS") && c->argc == 3) { /* XINFO GROUPS . */ if (s->cgroups == NULL) { - addReplyMultiBulkLen(c,0); + addReplyArrayLen(c,0); return; } - addReplyMultiBulkLen(c,raxSize(s->cgroups)); + addReplyArrayLen(c,raxSize(s->cgroups)); raxIterator ri; raxStart(&ri,s->cgroups); raxSeek(&ri,"^",NULL,0); while(raxNext(&ri)) { streamCG *cg = ri.data; - addReplyMultiBulkLen(c,8); + addReplyMapLen(c,4); addReplyBulkCString(c,"name"); addReplyBulkCBuffer(c,ri.key,ri.key_len); addReplyBulkCString(c,"consumers"); @@ -2508,7 +2513,7 @@ NULL raxStop(&ri); } else if (!strcasecmp(opt,"STREAM") && c->argc == 3) { /* XINFO STREAM (or the alias XINFO ). */ - addReplyMultiBulkLen(c,14); + addReplyMapLen(c,7); addReplyBulkCString(c,"length"); addReplyLongLong(c,s->length); addReplyBulkCString(c,"radix-tree-keys"); @@ -2529,11 +2534,11 @@ NULL addReplyBulkCString(c,"first-entry"); count = streamReplyWithRange(c,s,&start,&end,1,0,NULL,NULL, STREAM_RWR_RAWENTRIES,NULL); - if (!count) addReply(c,shared.nullbulk); + if (!count) addReplyNull(c); addReplyBulkCString(c,"last-entry"); count = streamReplyWithRange(c,s,&start,&end,1,1,NULL,NULL, STREAM_RWR_RAWENTRIES,NULL); - if (!count) addReply(c,shared.nullbulk); + if (!count) addReplyNull(c); } else { addReplySubcommandSyntaxError(c); } -- cgit v1.2.1