summaryrefslogtreecommitdiff
path: root/src/aof.c
diff options
context:
space:
mode:
authorzhaozhao.zz <zhaozhao.zz@alibaba-inc.com>2018-10-08 21:23:38 +0800
committerzhaozhao.zz <zhaozhao.zz@alibaba-inc.com>2018-10-09 15:40:20 +0800
commitb3e80d2f654a66358c53addffd34945363cce2bb (patch)
treed3ff08571ce24ebea08d3ff7f7a39c9561096c60 /src/aof.c
parent5f3adbee33d555d436b03d19f32ce903da36252b (diff)
downloadredis-b3e80d2f654a66358c53addffd34945363cce2bb.tar.gz
Stream & AOF: rewrite stream in correct way
Diffstat (limited to 'src/aof.c')
-rw-r--r--src/aof.c48
1 files changed, 32 insertions, 16 deletions
diff --git a/src/aof.c b/src/aof.c
index f8f26bdfe..3f914b772 100644
--- a/src/aof.c
+++ b/src/aof.c
@@ -1121,23 +1121,39 @@ int rewriteStreamObject(rio *r, robj *key, robj *o) {
streamID id;
int64_t numfields;
- /* Reconstruct the stream data using XADD commands. */
- while(streamIteratorGetID(&si,&id,&numfields)) {
- /* Emit a two elements array for each item. The first is
- * the ID, the second is an array of field-value pairs. */
-
- /* Emit the XADD <key> <id> ...fields... command. */
- if (rioWriteBulkCount(r,'*',3+numfields*2) == 0) return 0;
- if (rioWriteBulkString(r,"XADD",4) == 0) return 0;
- if (rioWriteBulkObject(r,key) == 0) return 0;
- if (rioWriteBulkStreamID(r,&id) == 0) return 0;
- while(numfields--) {
- unsigned char *field, *value;
- int64_t field_len, value_len;
- streamIteratorGetField(&si,&field,&value,&field_len,&value_len);
- if (rioWriteBulkString(r,(char*)field,field_len) == 0) return 0;
- if (rioWriteBulkString(r,(char*)value,value_len) == 0) return 0;
+ if (s->length) {
+ /* Reconstruct the stream data using XADD commands. */
+ while(streamIteratorGetID(&si,&id,&numfields)) {
+ /* Emit a two elements array for each item. The first is
+ * the ID, the second is an array of field-value pairs. */
+
+ /* Emit the XADD <key> <id> ...fields... command. */
+ if (rioWriteBulkCount(r,'*',3+numfields*2) == 0) return 0;
+ if (rioWriteBulkString(r,"XADD",4) == 0) return 0;
+ if (rioWriteBulkObject(r,key) == 0) return 0;
+ if (rioWriteBulkStreamID(r,&id) == 0) return 0;
+ while(numfields--) {
+ unsigned char *field, *value;
+ int64_t field_len, value_len;
+ streamIteratorGetField(&si,&field,&value,&field_len,&value_len);
+ if (rioWriteBulkString(r,(char*)field,field_len) == 0) return 0;
+ if (rioWriteBulkString(r,(char*)value,value_len) == 0) return 0;
+ }
}
+ /* Append XSTREAM SETID after XADD, make sure lastid is correct,
+ * in case of XDEL lastid. */
+ if (rioWriteBulkCount(r,'*',4) == 0) return 0;
+ if (rioWriteBulkString(r,"XSTREAM",7) == 0) return 0;
+ if (rioWriteBulkString(r,"SETID",5) == 0) return 0;
+ if (rioWriteBulkObject(r,key) == 0) return 0;
+ if (rioWriteBulkStreamID(r,&s->last_id) == 0) return 0;
+ } else {
+ /* Using XSTREAM CREATE if the stream is empty. */
+ if (rioWriteBulkCount(r,'*',4) == 0) return 0;
+ if (rioWriteBulkString(r,"XSTREAM",7) == 0) return 0;
+ if (rioWriteBulkString(r,"CREATE",6) == 0) return 0;
+ if (rioWriteBulkObject(r,key) == 0) return 0;
+ if (rioWriteBulkStreamID(r,&s->last_id) == 0) return 0;
}
/* Create all the stream consumer groups. */