summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSalvatore Sanfilippo <antirez@gmail.com>2018-10-08 12:00:00 +0200
committerGitHub <noreply@github.com>2018-10-08 12:00:00 +0200
commite5f1de144807373b2f42f82bb44829a8a4cc85cd (patch)
tree9dd1124ce21f0c2123bf6ce51afc9e3157f64d9a
parent8507c3f26e3435de0d44122d9be206fd1b8265dc (diff)
parent60acac4cd02913385c461465d4cca06d6c015ba7 (diff)
downloadredis-e5f1de144807373b2f42f82bb44829a8a4cc85cd.tar.gz
Merge pull request #5141 from soloestoy/fix-xtrim-inconsistency
Fix XTRIM and XADD with MAXLEN inconsistency
-rw-r--r--src/t_stream.c53
-rw-r--r--tests/unit/type/stream.tcl46
2 files changed, 86 insertions, 13 deletions
diff --git a/src/t_stream.c b/src/t_stream.c
index 09f3b37e5..54d49bd28 100644
--- a/src/t_stream.c
+++ b/src/t_stream.c
@@ -1119,8 +1119,7 @@ int streamParseStrictIDOrReply(client *c, robj *o, streamID *id, uint64_t missin
return streamGenericParseIDOrReply(c,o,id,missing_seq,1);
}
-
-/* XADD key [MAXLEN <count>] <ID or *> [field value] [field value] ... */
+/* XADD key [MAXLEN [~|=] <count>] <ID or *> [field value] [field value] ... */
void xaddCommand(client *c) {
streamID id;
int id_given = 0; /* Was an ID different than "*" specified? */
@@ -1140,11 +1139,14 @@ void xaddCommand(client *c) {
* creation. */
break;
} else if (!strcasecmp(opt,"maxlen") && moreargs) {
+ approx_maxlen = 0;
char *next = c->argv[i+1]->ptr;
/* Check for the form MAXLEN ~ <count>. */
if (moreargs >= 2 && next[0] == '~' && next[1] == '\0') {
approx_maxlen = 1;
i++;
+ } else if (moreargs >= 2 && next[0] == '=' && next[1] == '\0') {
+ i++;
}
if (getLongLongFromObjectOrReply(c,c->argv[i+1],&maxlen,NULL)
!= C_OK) return;
@@ -1191,18 +1193,22 @@ void xaddCommand(client *c) {
notifyKeyspaceEvent(NOTIFY_STREAM,"xadd",c->argv[1],c->db->id);
server.dirty++;
- /* Remove older elements if MAXLEN was specified. */
if (maxlen >= 0) {
- if (!streamTrimByLength(s,maxlen,approx_maxlen)) {
- /* If no trimming was performed, for instance because approximated
- * trimming length was specified, rewrite the MAXLEN argument
- * as zero, so that the command is propagated without trimming. */
- robj *zeroobj = createStringObjectFromLongLong(0);
- rewriteClientCommandArgument(c,maxlen_arg_idx,zeroobj);
- decrRefCount(zeroobj);
- } else {
+ /* Notify xtrim event if needed. */
+ if (streamTrimByLength(s,maxlen,approx_maxlen)) {
notifyKeyspaceEvent(NOTIFY_STREAM,"xtrim",c->argv[1],c->db->id);
}
+
+ /* Rewrite approximated MAXLEN as specified s->length. */
+ if (approx_maxlen) {
+ robj *maxlen_obj = createStringObjectFromLongLong(s->length);
+ rewriteClientCommandArgument(c,maxlen_arg_idx,maxlen_obj);
+ decrRefCount(maxlen_obj);
+
+ robj *equal_obj = createStringObject("=",1);
+ rewriteClientCommandArgument(c,maxlen_arg_idx-1,equal_obj);
+ decrRefCount(equal_obj);
+ }
}
/* Let's rewrite the ID argument with the one actually generated for
@@ -2187,7 +2193,7 @@ void xdelCommand(client *c) {
*
* List of options:
*
- * MAXLEN [~] <count> -- Trim so that the stream will be capped at
+ * MAXLEN [~|=] <count> -- Trim so that the stream will be capped at
* the specified length. Use ~ before the
* count in order to demand approximated trimming
* (like XADD MAXLEN option).
@@ -2206,9 +2212,10 @@ void xtrimCommand(client *c) {
/* Argument parsing. */
int trim_strategy = TRIM_STRATEGY_NONE;
- long long maxlen = 0; /* 0 means no maximum length. */
+ long long maxlen = -1; /* If left to -1 no trimming is performed. */
int approx_maxlen = 0; /* If 1 only delete whole radix tree nodes, so
the maxium length is not applied verbatim. */
+ int maxlen_arg_idx = 0; /* Index of the count in MAXLEN, for rewriting. */
/* Parse options. */
int i = 2; /* Start of options. */
@@ -2216,16 +2223,25 @@ void xtrimCommand(client *c) {
int moreargs = (c->argc-1) - i; /* Number of additional arguments. */
char *opt = c->argv[i]->ptr;
if (!strcasecmp(opt,"maxlen") && moreargs) {
+ approx_maxlen = 0;
trim_strategy = TRIM_STRATEGY_MAXLEN;
char *next = c->argv[i+1]->ptr;
/* Check for the form MAXLEN ~ <count>. */
if (moreargs >= 2 && next[0] == '~' && next[1] == '\0') {
approx_maxlen = 1;
i++;
+ } else if (moreargs >= 2 && next[0] == '=' && next[1] == '\0') {
+ i++;
}
if (getLongLongFromObjectOrReply(c,c->argv[i+1],&maxlen,NULL)
!= C_OK) return;
+
+ if (maxlen < 0) {
+ addReplyError(c,"The MAXLEN argument must be >= 0.");
+ return;
+ }
i++;
+ maxlen_arg_idx = i;
} else {
addReply(c,shared.syntaxerr);
return;
@@ -2246,6 +2262,17 @@ void xtrimCommand(client *c) {
signalModifiedKey(c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_STREAM,"xtrim",c->argv[1],c->db->id);
server.dirty += deleted;
+
+ /* Rewrite approximated MAXLEN as specified s->length. */
+ if (approx_maxlen) {
+ robj *maxlen_obj = createStringObjectFromLongLong(s->length);
+ rewriteClientCommandArgument(c,maxlen_arg_idx,maxlen_obj);
+ decrRefCount(maxlen_obj);
+
+ robj *equal_obj = createStringObject("=",1);
+ rewriteClientCommandArgument(c,maxlen_arg_idx-1,equal_obj);
+ decrRefCount(equal_obj);
+ }
}
addReplyLongLong(c,deleted);
}
diff --git a/tests/unit/type/stream.tcl b/tests/unit/type/stream.tcl
index 5cf6805d7..2b69a2e9e 100644
--- a/tests/unit/type/stream.tcl
+++ b/tests/unit/type/stream.tcl
@@ -317,3 +317,49 @@ start_server {
assert_equal [r xrevrange teststream2 1234567891245 -] {{1234567891240-0 {key1 value2}} {1234567891230-0 {key1 value1}}}
}
}
+
+start_server {tags {"stream"} overrides {appendonly yes}} {
+ test {XADD with MAXLEN > xlen can propagate correctly} {
+ for {set j 0} {$j < 100} {incr j} {
+ r XADD mystream * xitem v
+ }
+ r XADD mystream MAXLEN 200 * xitem v
+ incr j
+ assert {[r xlen mystream] == $j}
+ r debug loadaof
+ r XADD mystream * xitem v
+ incr j
+ assert {[r xlen mystream] == $j}
+ }
+}
+
+start_server {tags {"stream"} overrides {appendonly yes}} {
+ test {XADD with ~ MAXLEN can propagate correctly} {
+ for {set j 0} {$j < 100} {incr j} {
+ r XADD mystream * xitem v
+ }
+ r XADD mystream MAXLEN ~ $j * xitem v
+ incr j
+ assert {[r xlen mystream] == $j}
+ r config set stream-node-max-entries 1
+ r debug loadaof
+ r XADD mystream * xitem v
+ incr j
+ assert {[r xlen mystream] == $j}
+ }
+}
+
+start_server {tags {"stream"} overrides {appendonly yes stream-node-max-entries 10}} {
+ test {XTRIM with ~ MAXLEN can propagate correctly} {
+ for {set j 0} {$j < 100} {incr j} {
+ r XADD mystream * xitem v
+ }
+ r XTRIM mystream MAXLEN ~ 85
+ assert {[r xlen mystream] == 89}
+ r config set stream-node-max-entries 1
+ r debug loadaof
+ r XADD mystream * xitem v
+ incr j
+ assert {[r xlen mystream] == 90}
+ }
+}