summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorzhaozhao.zz <zhaozhao.zz@alibaba-inc.com>2018-07-18 01:58:14 +0800
committerzhaozhao.zz <zhaozhao.zz@alibaba-inc.com>2018-08-01 10:34:12 +0800
commit9042d1c24966bf229b3fa8d94ada42ebebed7adf (patch)
tree38e168317be23fc574206cff5de13a1c697a29c1
parent14d6318b3225c010f28d26c5563c8140c86c1292 (diff)
downloadredis-9042d1c24966bf229b3fa8d94ada42ebebed7adf.tar.gz
Streams: propagate specified MAXLEN instead of approximated
Slaves and rebooting redis may have different radix tree struct, by different stream* config options. So propagating approximated MAXLEN to AOF/slaves may lead to date inconsistency.
-rw-r--r--src/t_stream.c41
1 files changed, 35 insertions, 6 deletions
diff --git a/src/t_stream.c b/src/t_stream.c
index 7533ba7b0..7eaf0c547 100644
--- a/src/t_stream.c
+++ b/src/t_stream.c
@@ -1114,8 +1114,7 @@ int streamParseStrictIDOrReply(client *c, robj *o, streamID *id, uint64_t missin
return streamGenericParseIDOrReply(c,o,id,missing_seq,1);
}
-
-/* XADD key [MAXLEN <count>] <ID or *> [field value] [field value] ... */
+/* XADD key [MAXLEN [~|=] <count>] <ID or *> [field value] [field value] ... */
void xaddCommand(client *c) {
streamID id;
int id_given = 0; /* Was an ID different than "*" specified? */
@@ -1141,6 +1140,8 @@ void xaddCommand(client *c) {
if (moreargs >= 2 && next[0] == '~' && next[1] == '\0') {
approx_maxlen = 1;
i++;
+ } else if (moreargs >= 2 && next[0] == '=' && next[1] == '\0') {
+ i++;
}
if (getLongLongFromObjectOrReply(c,c->argv[i+1],&maxlen,NULL)
!= C_OK) return;
@@ -1187,9 +1188,22 @@ void xaddCommand(client *c) {
notifyKeyspaceEvent(NOTIFY_STREAM,"xadd",c->argv[1],c->db->id);
server.dirty++;
- /* Notify xtrim event if needed. */
- if (maxlen >= 0 && streamTrimByLength(s,maxlen,approx_maxlen)) {
- notifyKeyspaceEvent(NOTIFY_STREAM,"xtrim",c->argv[1],c->db->id);
+ if (maxlen >= 0) {
+ /* Notify xtrim event if needed. */
+ if (streamTrimByLength(s,maxlen,approx_maxlen)) {
+ notifyKeyspaceEvent(NOTIFY_STREAM,"xtrim",c->argv[1],c->db->id);
+ }
+
+ /* Rewrite approximated MAXLEN as specified s->length. */
+ if (approx_maxlen) {
+ robj *maxlen_obj = createStringObjectFromLongLong(s->length);
+ rewriteClientCommandArgument(c,maxlen_arg_idx,maxlen_obj);
+ decrRefCount(maxlen_obj);
+
+ robj *equal_obj = createStringObject("=",1);
+ rewriteClientCommandArgument(c,maxlen_arg_idx-1,equal_obj);
+ decrRefCount(equal_obj);
+ }
}
/* Let's rewrite the ID argument with the one actually generated for
@@ -2174,7 +2188,7 @@ void xdelCommand(client *c) {
*
* List of options:
*
- * MAXLEN [~] <count> -- Trim so that the stream will be capped at
+ * MAXLEN [~|=] <count> -- Trim so that the stream will be capped at
* the specified length. Use ~ before the
* count in order to demand approximated trimming
* (like XADD MAXLEN option).
@@ -2196,6 +2210,7 @@ void xtrimCommand(client *c) {
long long maxlen = -1; /* If left to -1 no trimming is performed. */
int approx_maxlen = 0; /* If 1 only delete whole radix tree nodes, so
the maxium length is not applied verbatim. */
+ int maxlen_arg_idx = 0; /* Index of the count in MAXLEN, for rewriting. */
/* Parse options. */
int i = 2; /* Start of options. */
@@ -2210,6 +2225,8 @@ void xtrimCommand(client *c) {
if (moreargs >= 2 && next[0] == '~' && next[1] == '\0') {
approx_maxlen = 1;
i++;
+ } else if (moreargs >= 2 && next[0] == '=' && next[1] == '\0') {
+ i++;
}
if (getLongLongFromObjectOrReply(c,c->argv[i+1],&maxlen,NULL)
!= C_OK) return;
@@ -2219,6 +2236,7 @@ void xtrimCommand(client *c) {
return;
}
i++;
+ maxlen_arg_idx = i;
} else {
addReply(c,shared.syntaxerr);
return;
@@ -2239,6 +2257,17 @@ void xtrimCommand(client *c) {
signalModifiedKey(c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_STREAM,"xtrim",c->argv[1],c->db->id);
server.dirty += deleted;
+
+ /* Rewrite approximated MAXLEN as specified s->length. */
+ if (approx_maxlen) {
+ robj *maxlen_obj = createStringObjectFromLongLong(s->length);
+ rewriteClientCommandArgument(c,maxlen_arg_idx,maxlen_obj);
+ decrRefCount(maxlen_obj);
+
+ robj *equal_obj = createStringObject("=",1);
+ rewriteClientCommandArgument(c,maxlen_arg_idx-1,equal_obj);
+ decrRefCount(equal_obj);
+ }
}
addReplyLongLong(c,deleted);
}