diff options
Diffstat (limited to 'src/t_stream.c')
-rw-r--r-- | src/t_stream.c | 32 |
1 files changed, 23 insertions, 9 deletions
diff --git a/src/t_stream.c b/src/t_stream.c index cd7d9723e..e6e5da731 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -1000,7 +1000,7 @@ static int streamParseAddOrTrimArgsOrReply(client *c, streamAddTrimArgs *args, i return -1; } - if (c == server.master || c->id == CLIENT_ID_AOF) { + if (mustObeyClient(c)) { /* If command came from master or from AOF we must not enforce maxnodes * (The maxlen/minid argument was re-written to make sure there's no * inconsistency). */ @@ -1370,24 +1370,35 @@ void streamLastValidID(stream *s, streamID *maxid) streamIteratorStop(&si); } +/* Maximum size for a stream ID string. In theory 20*2+1 should be enough, + * But to avoid chance for off by one issues and null-term, in case this will + * be used as parsing buffer, we use a slightly larger buffer. On the other + * hand considering sds header is gonna add 4 bytes, we wanna keep below the + * allocator's 48 bytes bin. */ +#define STREAM_ID_STR_LEN 44 + +sds createStreamIDString(streamID *id) { + /* Optimization: pre-allocate a big enough buffer to avoid reallocs. */ + sds str = sdsnewlen(SDS_NOINIT, STREAM_ID_STR_LEN); + sdssetlen(str, 0); + return sdscatfmt(str,"%U-%U", id->ms,id->seq); +} + /* Emit a reply in the client output buffer by formatting a Stream ID * in the standard <ms>-<seq> format, using the simple string protocol * of REPL. */ void addReplyStreamID(client *c, streamID *id) { - sds replyid = sdscatfmt(sdsempty(),"%U-%U",id->ms,id->seq); - addReplyBulkSds(c,replyid); + addReplyBulkSds(c,createStreamIDString(id)); } void setDeferredReplyStreamID(client *c, void *dr, streamID *id) { - sds replyid = sdscatfmt(sdsempty(),"%U-%U",id->ms,id->seq); - setDeferredReplyBulkSds(c, dr, replyid); + setDeferredReplyBulkSds(c, dr, createStreamIDString(id)); } /* Similar to the above function, but just creates an object, usually useful * for replication purposes to create arguments. */ robj *createObjectFromStreamID(streamID *id) { - return createObject(OBJ_STRING, sdscatfmt(sdsempty(),"%U-%U", - id->ms,id->seq)); + return createObject(OBJ_STRING, createStreamIDString(id)); } /* Returns non-zero if the ID is 0-0. */ @@ -2025,7 +2036,8 @@ void xaddCommand(client *c) { addReplyError(c,"Elements are too large to be stored"); return; } - addReplyStreamID(c,&id); + sds replyid = createStreamIDString(&id); + addReplyBulkCBuffer(c, replyid, sdslen(replyid)); signalModifiedKey(c,c->db,c->argv[1]); notifyKeyspaceEvent(NOTIFY_STREAM,"xadd",c->argv[1],c->db->id); @@ -2050,9 +2062,11 @@ void xaddCommand(client *c) { /* Let's rewrite the ID argument with the one actually generated for * AOF/replication propagation. */ if (!parsed_args.id_given || !parsed_args.seq_given) { - robj *idarg = createObjectFromStreamID(&id); + robj *idarg = createObject(OBJ_STRING, replyid); rewriteClientCommandArgument(c, idpos, idarg); decrRefCount(idarg); + } else { + sdsfree(replyid); } /* We need to signal to blocked clients that there is new data on this |