diff options
author | antirez <antirez@gmail.com> | 2018-10-08 12:05:20 +0200 |
---|---|---|
committer | antirez <antirez@gmail.com> | 2018-10-08 12:05:22 +0200 |
commit | 3e78344d878fbbd6fdc566c210f5d834f7ac147e (patch) | |
tree | 4dcef3db9a3627b96edf3f1f165d6ee671ccbb50 | |
parent | e5f1de144807373b2f42f82bb44829a8a4cc85cd (diff) | |
download | redis-3e78344d878fbbd6fdc566c210f5d834f7ac147e.tar.gz |
Refactoring of XADD / XTRIM MAXLEN rewriting.
See #5141.
-rw-r--r-- | src/t_stream.c | 37 |
1 files changed, 15 insertions, 22 deletions
diff --git a/src/t_stream.c b/src/t_stream.c index 54d49bd28..4387e08a5 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -1119,6 +1119,19 @@ int streamParseStrictIDOrReply(client *c, robj *o, streamID *id, uint64_t missin return streamGenericParseIDOrReply(c,o,id,missing_seq,1); } +/* We propagate MAXLEN ~ <count> as MAXLEN = <resulting-len-of-stream> + * otherwise trimming is no longer determinsitic on replicas / AOF. */ +void streamRewriteApproxMaxlen(client *c, stream *s, int maxlen_arg_idx) { + robj *maxlen_obj = createStringObjectFromLongLong(s->length); + robj *equal_obj = createStringObject("=",1); + + rewriteClientCommandArgument(c,maxlen_arg_idx,maxlen_obj); + rewriteClientCommandArgument(c,maxlen_arg_idx-1,equal_obj); + + decrRefCount(equal_obj); + decrRefCount(maxlen_obj); +} + /* XADD key [MAXLEN [~|=] <count>] <ID or *> [field value] [field value] ... */ void xaddCommand(client *c) { streamID id; @@ -1198,17 +1211,7 @@ void xaddCommand(client *c) { if (streamTrimByLength(s,maxlen,approx_maxlen)) { notifyKeyspaceEvent(NOTIFY_STREAM,"xtrim",c->argv[1],c->db->id); } - - /* Rewrite approximated MAXLEN as specified s->length. */ - if (approx_maxlen) { - robj *maxlen_obj = createStringObjectFromLongLong(s->length); - rewriteClientCommandArgument(c,maxlen_arg_idx,maxlen_obj); - decrRefCount(maxlen_obj); - - robj *equal_obj = createStringObject("=",1); - rewriteClientCommandArgument(c,maxlen_arg_idx-1,equal_obj); - decrRefCount(equal_obj); - } + if (approx_maxlen) streamRewriteApproxMaxlen(c,s,maxlen_arg_idx); } /* Let's rewrite the ID argument with the one actually generated for @@ -2262,17 +2265,7 @@ void xtrimCommand(client *c) { signalModifiedKey(c->db,c->argv[1]); notifyKeyspaceEvent(NOTIFY_STREAM,"xtrim",c->argv[1],c->db->id); server.dirty += deleted; - - /* Rewrite approximated MAXLEN as specified s->length. */ - if (approx_maxlen) { - robj *maxlen_obj = createStringObjectFromLongLong(s->length); - rewriteClientCommandArgument(c,maxlen_arg_idx,maxlen_obj); - decrRefCount(maxlen_obj); - - robj *equal_obj = createStringObject("=",1); - rewriteClientCommandArgument(c,maxlen_arg_idx-1,equal_obj); - decrRefCount(equal_obj); - } + if (approx_maxlen) streamRewriteApproxMaxlen(c,s,maxlen_arg_idx); } addReplyLongLong(c,deleted); } |