summaryrefslogtreecommitdiff
path: root/src/aof.c
diff options
context:
space:
mode:
authorantirez <antirez@gmail.com>2018-10-16 16:48:31 +0200
committerantirez <antirez@gmail.com>2018-10-16 16:48:31 +0200
commitc1689166b71f298aec40db877e125c76c178b0ba (patch)
tree634f838b23247e6d080ad8a28007651681346b27 /src/aof.c
parentea78a1db329961d53ec420ac01ddb7cde42795ba (diff)
downloadredis-c1689166b71f298aec40db877e125c76c178b0ba.tar.gz
Streams: rewrite empty streams with XADD MAXLEN 0. Use XSETID.
Related to #5426.
Diffstat (limited to 'src/aof.c')
-rw-r--r--src/aof.c30
1 files changed, 18 insertions, 12 deletions
diff --git a/src/aof.c b/src/aof.c
index 4877172a9..7d31183e8 100644
--- a/src/aof.c
+++ b/src/aof.c
@@ -1158,22 +1158,28 @@ int rewriteStreamObject(rio *r, robj *key, robj *o) {
if (rioWriteBulkString(r,(char*)value,value_len) == 0) return 0;
}
}
- /* Append XSTREAM SETID after XADD, make sure lastid is correct,
- * in case of XDEL lastid. */
- if (rioWriteBulkCount(r,'*',4) == 0) return 0;
- if (rioWriteBulkString(r,"XSTREAM",7) == 0) return 0;
- if (rioWriteBulkString(r,"SETID",5) == 0) return 0;
- if (rioWriteBulkObject(r,key) == 0) return 0;
- if (rioWriteBulkStreamID(r,&s->last_id) == 0) return 0;
} else {
- /* Using XSTREAM CREATE if the stream is empty. */
- if (rioWriteBulkCount(r,'*',4) == 0) return 0;
- if (rioWriteBulkString(r,"XSTREAM",7) == 0) return 0;
- if (rioWriteBulkString(r,"CREATE",6) == 0) return 0;
+ /* Use the XADD MAXLEN 0 trick to generate an empty stream if
+ * the key we are serializing is an empty string, which is possible
+ * for the Stream type. */
+ if (rioWriteBulkCount(r,'*',7) == 0) return 0;
+ if (rioWriteBulkString(r,"XADD",4) == 0) return 0;
if (rioWriteBulkObject(r,key) == 0) return 0;
- if (rioWriteBulkStreamID(r,&s->last_id) == 0) return 0;
+ if (rioWriteBulkString(r,"MAXLEN",6) == 0) return 0;
+ if (rioWriteBulkString(r,"0",1) == 0) return 0;
+ if (rioWriteBulkString(r,"*",1) == 0) return 0;
+ if (rioWriteBulkString(r,"x",1) == 0) return 0;
+ if (rioWriteBulkString(r,"y",1) == 0) return 0;
}
+ /* Append XSETID after XADD, make sure lastid is correct,
+ * in case of XDEL lastid. */
+ if (rioWriteBulkCount(r,'*',3) == 0) return 0;
+ if (rioWriteBulkString(r,"XSETID",6) == 0) return 0;
+ if (rioWriteBulkObject(r,key) == 0) return 0;
+ if (rioWriteBulkStreamID(r,&s->last_id) == 0) return 0;
+
+
/* Create all the stream consumer groups. */
if (s->cgroups) {
raxIterator ri;