summaryrefslogtreecommitdiff
path: root/src/t_stream.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/t_stream.c')
-rw-r--r--src/t_stream.c32
1 files changed, 23 insertions, 9 deletions
diff --git a/src/t_stream.c b/src/t_stream.c
index cd7d9723e..e6e5da731 100644
--- a/src/t_stream.c
+++ b/src/t_stream.c
@@ -1000,7 +1000,7 @@ static int streamParseAddOrTrimArgsOrReply(client *c, streamAddTrimArgs *args, i
return -1;
}
- if (c == server.master || c->id == CLIENT_ID_AOF) {
+ if (mustObeyClient(c)) {
/* If command came from master or from AOF we must not enforce maxnodes
* (The maxlen/minid argument was re-written to make sure there's no
* inconsistency). */
@@ -1370,24 +1370,35 @@ void streamLastValidID(stream *s, streamID *maxid)
streamIteratorStop(&si);
}
+/* Maximum size for a stream ID string. In theory 20*2+1 should be enough,
+ * But to avoid chance for off by one issues and null-term, in case this will
+ * be used as parsing buffer, we use a slightly larger buffer. On the other
+ * hand considering sds header is gonna add 4 bytes, we wanna keep below the
+ * allocator's 48 bytes bin. */
+#define STREAM_ID_STR_LEN 44
+
+sds createStreamIDString(streamID *id) {
+ /* Optimization: pre-allocate a big enough buffer to avoid reallocs. */
+ sds str = sdsnewlen(SDS_NOINIT, STREAM_ID_STR_LEN);
+ sdssetlen(str, 0);
+ return sdscatfmt(str,"%U-%U", id->ms,id->seq);
+}
+
/* Emit a reply in the client output buffer by formatting a Stream ID
* in the standard <ms>-<seq> format, using the simple string protocol
* of REPL. */
void addReplyStreamID(client *c, streamID *id) {
- sds replyid = sdscatfmt(sdsempty(),"%U-%U",id->ms,id->seq);
- addReplyBulkSds(c,replyid);
+ addReplyBulkSds(c,createStreamIDString(id));
}
void setDeferredReplyStreamID(client *c, void *dr, streamID *id) {
- sds replyid = sdscatfmt(sdsempty(),"%U-%U",id->ms,id->seq);
- setDeferredReplyBulkSds(c, dr, replyid);
+ setDeferredReplyBulkSds(c, dr, createStreamIDString(id));
}
/* Similar to the above function, but just creates an object, usually useful
* for replication purposes to create arguments. */
robj *createObjectFromStreamID(streamID *id) {
- return createObject(OBJ_STRING, sdscatfmt(sdsempty(),"%U-%U",
- id->ms,id->seq));
+ return createObject(OBJ_STRING, createStreamIDString(id));
}
/* Returns non-zero if the ID is 0-0. */
@@ -2025,7 +2036,8 @@ void xaddCommand(client *c) {
addReplyError(c,"Elements are too large to be stored");
return;
}
- addReplyStreamID(c,&id);
+ sds replyid = createStreamIDString(&id);
+ addReplyBulkCBuffer(c, replyid, sdslen(replyid));
signalModifiedKey(c,c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_STREAM,"xadd",c->argv[1],c->db->id);
@@ -2050,9 +2062,11 @@ void xaddCommand(client *c) {
/* Let's rewrite the ID argument with the one actually generated for
* AOF/replication propagation. */
if (!parsed_args.id_given || !parsed_args.seq_given) {
- robj *idarg = createObjectFromStreamID(&id);
+ robj *idarg = createObject(OBJ_STRING, replyid);
rewriteClientCommandArgument(c, idpos, idarg);
decrRefCount(idarg);
+ } else {
+ sdsfree(replyid);
}
/* We need to signal to blocked clients that there is new data on this