summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorantirez <antirez@gmail.com>2018-10-08 12:05:20 +0200
committerantirez <antirez@gmail.com>2018-10-08 12:05:22 +0200
commit3e78344d878fbbd6fdc566c210f5d834f7ac147e (patch)
tree4dcef3db9a3627b96edf3f1f165d6ee671ccbb50
parente5f1de144807373b2f42f82bb44829a8a4cc85cd (diff)
downloadredis-3e78344d878fbbd6fdc566c210f5d834f7ac147e.tar.gz
Refactoring of XADD / XTRIM MAXLEN rewriting.
See #5141.
-rw-r--r--src/t_stream.c37
1 files changed, 15 insertions, 22 deletions
diff --git a/src/t_stream.c b/src/t_stream.c
index 54d49bd28..4387e08a5 100644
--- a/src/t_stream.c
+++ b/src/t_stream.c
@@ -1119,6 +1119,19 @@ int streamParseStrictIDOrReply(client *c, robj *o, streamID *id, uint64_t missin
return streamGenericParseIDOrReply(c,o,id,missing_seq,1);
}
+/* We propagate MAXLEN ~ <count> as MAXLEN = <resulting-len-of-stream>
+ * otherwise trimming is no longer determinsitic on replicas / AOF. */
+void streamRewriteApproxMaxlen(client *c, stream *s, int maxlen_arg_idx) {
+ robj *maxlen_obj = createStringObjectFromLongLong(s->length);
+ robj *equal_obj = createStringObject("=",1);
+
+ rewriteClientCommandArgument(c,maxlen_arg_idx,maxlen_obj);
+ rewriteClientCommandArgument(c,maxlen_arg_idx-1,equal_obj);
+
+ decrRefCount(equal_obj);
+ decrRefCount(maxlen_obj);
+}
+
/* XADD key [MAXLEN [~|=] <count>] <ID or *> [field value] [field value] ... */
void xaddCommand(client *c) {
streamID id;
@@ -1198,17 +1211,7 @@ void xaddCommand(client *c) {
if (streamTrimByLength(s,maxlen,approx_maxlen)) {
notifyKeyspaceEvent(NOTIFY_STREAM,"xtrim",c->argv[1],c->db->id);
}
-
- /* Rewrite approximated MAXLEN as specified s->length. */
- if (approx_maxlen) {
- robj *maxlen_obj = createStringObjectFromLongLong(s->length);
- rewriteClientCommandArgument(c,maxlen_arg_idx,maxlen_obj);
- decrRefCount(maxlen_obj);
-
- robj *equal_obj = createStringObject("=",1);
- rewriteClientCommandArgument(c,maxlen_arg_idx-1,equal_obj);
- decrRefCount(equal_obj);
- }
+ if (approx_maxlen) streamRewriteApproxMaxlen(c,s,maxlen_arg_idx);
}
/* Let's rewrite the ID argument with the one actually generated for
@@ -2262,17 +2265,7 @@ void xtrimCommand(client *c) {
signalModifiedKey(c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_STREAM,"xtrim",c->argv[1],c->db->id);
server.dirty += deleted;
-
- /* Rewrite approximated MAXLEN as specified s->length. */
- if (approx_maxlen) {
- robj *maxlen_obj = createStringObjectFromLongLong(s->length);
- rewriteClientCommandArgument(c,maxlen_arg_idx,maxlen_obj);
- decrRefCount(maxlen_obj);
-
- robj *equal_obj = createStringObject("=",1);
- rewriteClientCommandArgument(c,maxlen_arg_idx-1,equal_obj);
- decrRefCount(equal_obj);
- }
+ if (approx_maxlen) streamRewriteApproxMaxlen(c,s,maxlen_arg_idx);
}
addReplyLongLong(c,deleted);
}