summaryrefslogtreecommitdiff
path: root/src/aof.c
diff options
context:
space:
mode:
authorantirez <antirez@gmail.com>2017-09-15 12:37:04 +0200
committerantirez <antirez@gmail.com>2017-12-01 10:24:24 +0100
commit26d4f8e3ec74811076e8a71cd384ea89b10e0c13 (patch)
tree273aa7717b421d09a2daa9a97585127b0becab02 /src/aof.c
parent01ea018c4080e24b00d36e1cbf36c4d98b82ff40 (diff)
downloadredis-26d4f8e3ec74811076e8a71cd384ea89b10e0c13.tar.gz
Streams: AOF rewriting + minor iterator improvements.
Diffstat (limited to 'src/aof.c')
-rw-r--r--src/aof.c33
1 files changed, 33 insertions, 0 deletions
diff --git a/src/aof.c b/src/aof.c
index 0593b2707..5fbfdd695 100644
--- a/src/aof.c
+++ b/src/aof.c
@@ -1031,6 +1031,37 @@ int rewriteHashObject(rio *r, robj *key, robj *o) {
return 1;
}
+/* Emit the commands needed to rebuild a stream object.
+ * The function returns 0 on error, 1 on success. */
+int rewriteStreamObject(rio *r, robj *key, robj *o) {
+ streamIterator si;
+ streamIteratorStart(&si,o->ptr,NULL,NULL);
+ streamID id;
+ int64_t numfields;
+
+ while(streamIteratorGetID(&si,&id,&numfields)) {
+ /* Emit a two elements array for each item. The first is
+ * the ID, the second is an array of field-value pairs. */
+
+ /* Emit the XADD <key> <id> ...fields... command. */
+ if (rioWriteBulkCount(r,'*',3+numfields*2) == 0) return 0;
+ if (rioWriteBulkString(r,"XADD",4) == 0) return 0;
+ if (rioWriteBulkObject(r,key) == 0) return 0;
+ sds replyid = sdscatfmt(sdsempty(),"%U.%U",id.ms,id.seq);
+ if (rioWriteBulkString(r,replyid,sdslen(replyid)) == 0) return 0;
+ sdsfree(replyid);
+ while(numfields--) {
+ unsigned char *field, *value;
+ int64_t field_len, value_len;
+ streamIteratorGetField(&si,&field,&value,&field_len,&value_len);
+ if (rioWriteBulkString(r,(char*)field,field_len) == 0) return 0;
+ if (rioWriteBulkString(r,(char*)value,value_len) == 0) return 0;
+ }
+ }
+ streamIteratorStop(&si);
+ return 1;
+}
+
/* Call the module type callback in order to rewrite a data type
* that is exported by a module and is not handled by Redis itself.
* The function returns 0 on error, 1 on success. */
@@ -1111,6 +1142,8 @@ int rewriteAppendOnlyFileRio(rio *aof) {
if (rewriteSortedSetObject(aof,&key,o) == 0) goto werr;
} else if (o->type == OBJ_HASH) {
if (rewriteHashObject(aof,&key,o) == 0) goto werr;
+ } else if (o->type == OBJ_STREAM) {
+ if (rewriteStreamObject(aof,&key,o) == 0) goto werr;
} else if (o->type == OBJ_MODULE) {
if (rewriteModuleObject(aof,&key,o) == 0) goto werr;
} else {