diff options
author | zhaozhao.zz <zhaozhao.zz@alibaba-inc.com> | 2018-10-08 21:23:38 +0800 |
---|---|---|
committer | zhaozhao.zz <zhaozhao.zz@alibaba-inc.com> | 2018-10-09 15:40:20 +0800 |
commit | b3e80d2f654a66358c53addffd34945363cce2bb (patch) | |
tree | d3ff08571ce24ebea08d3ff7f7a39c9561096c60 /src | |
parent | 5f3adbee33d555d436b03d19f32ce903da36252b (diff) | |
download | redis-b3e80d2f654a66358c53addffd34945363cce2bb.tar.gz |
Stream & AOF: rewrite stream in correct way
Diffstat (limited to 'src')
-rw-r--r-- | src/aof.c | 48 |
1 files changed, 32 insertions, 16 deletions
@@ -1121,23 +1121,39 @@ int rewriteStreamObject(rio *r, robj *key, robj *o) { streamID id; int64_t numfields; - /* Reconstruct the stream data using XADD commands. */ - while(streamIteratorGetID(&si,&id,&numfields)) { - /* Emit a two elements array for each item. The first is - * the ID, the second is an array of field-value pairs. */ - - /* Emit the XADD <key> <id> ...fields... command. */ - if (rioWriteBulkCount(r,'*',3+numfields*2) == 0) return 0; - if (rioWriteBulkString(r,"XADD",4) == 0) return 0; - if (rioWriteBulkObject(r,key) == 0) return 0; - if (rioWriteBulkStreamID(r,&id) == 0) return 0; - while(numfields--) { - unsigned char *field, *value; - int64_t field_len, value_len; - streamIteratorGetField(&si,&field,&value,&field_len,&value_len); - if (rioWriteBulkString(r,(char*)field,field_len) == 0) return 0; - if (rioWriteBulkString(r,(char*)value,value_len) == 0) return 0; + if (s->length) { + /* Reconstruct the stream data using XADD commands. */ + while(streamIteratorGetID(&si,&id,&numfields)) { + /* Emit a two elements array for each item. The first is + * the ID, the second is an array of field-value pairs. */ + + /* Emit the XADD <key> <id> ...fields... command. */ + if (rioWriteBulkCount(r,'*',3+numfields*2) == 0) return 0; + if (rioWriteBulkString(r,"XADD",4) == 0) return 0; + if (rioWriteBulkObject(r,key) == 0) return 0; + if (rioWriteBulkStreamID(r,&id) == 0) return 0; + while(numfields--) { + unsigned char *field, *value; + int64_t field_len, value_len; + streamIteratorGetField(&si,&field,&value,&field_len,&value_len); + if (rioWriteBulkString(r,(char*)field,field_len) == 0) return 0; + if (rioWriteBulkString(r,(char*)value,value_len) == 0) return 0; + } } + /* Append XSTREAM SETID after XADD, make sure lastid is correct, + * in case of XDEL lastid. */ + if (rioWriteBulkCount(r,'*',4) == 0) return 0; + if (rioWriteBulkString(r,"XSTREAM",7) == 0) return 0; + if (rioWriteBulkString(r,"SETID",5) == 0) return 0; + if (rioWriteBulkObject(r,key) == 0) return 0; + if (rioWriteBulkStreamID(r,&s->last_id) == 0) return 0; + } else { + /* Using XSTREAM CREATE if the stream is empty. */ + if (rioWriteBulkCount(r,'*',4) == 0) return 0; + if (rioWriteBulkString(r,"XSTREAM",7) == 0) return 0; + if (rioWriteBulkString(r,"CREATE",6) == 0) return 0; + if (rioWriteBulkObject(r,key) == 0) return 0; + if (rioWriteBulkStreamID(r,&s->last_id) == 0) return 0; } /* Create all the stream consumer groups. */ |