diff options
Diffstat (limited to 'src/aof.c')
-rw-r--r-- | src/aof.c | 69 |
1 files changed, 36 insertions, 33 deletions
@@ -923,7 +923,7 @@ int rioWriteBulkObject(rio *r, robj *obj) { /* Emit the commands needed to rebuild a list object. * The function returns 0 on error, 1 on success. */ -int rewriteListObject(rio *r, robj *key, robj *o) { +int rewriteListObject(rio *r, rkey *key, robj *o) { long long count = 0, items = listTypeLength(o); if (o->encoding == OBJ_ENCODING_QUICKLIST) { @@ -937,7 +937,7 @@ int rewriteListObject(rio *r, robj *key, robj *o) { AOF_REWRITE_ITEMS_PER_CMD : items; if (rioWriteBulkCount(r,'*',2+cmd_items) == 0) return 0; if (rioWriteBulkString(r,"RPUSH",5) == 0) return 0; - if (rioWriteBulkObject(r,key) == 0) return 0; + if (rioWriteBulkString(r,key->name,key->len) == 0) return 0; } if (entry.value) { @@ -957,7 +957,7 @@ int rewriteListObject(rio *r, robj *key, robj *o) { /* Emit the commands needed to rebuild a set object. * The function returns 0 on error, 1 on success. */ -int rewriteSetObject(rio *r, robj *key, robj *o) { +int rewriteSetObject(rio *r, rkey *key, robj *o) { long long count = 0, items = setTypeSize(o); if (o->encoding == OBJ_ENCODING_INTSET) { @@ -971,7 +971,7 @@ int rewriteSetObject(rio *r, robj *key, robj *o) { if (rioWriteBulkCount(r,'*',2+cmd_items) == 0) return 0; if (rioWriteBulkString(r,"SADD",4) == 0) return 0; - if (rioWriteBulkObject(r,key) == 0) return 0; + if (rioWriteBulkString(r,key->name,key->len) == 0) return 0; } if (rioWriteBulkLongLong(r,llval) == 0) return 0; if (++count == AOF_REWRITE_ITEMS_PER_CMD) count = 0; @@ -989,7 +989,7 @@ int rewriteSetObject(rio *r, robj *key, robj *o) { if (rioWriteBulkCount(r,'*',2+cmd_items) == 0) return 0; if (rioWriteBulkString(r,"SADD",4) == 0) return 0; - if (rioWriteBulkObject(r,key) == 0) return 0; + if (rioWriteBulkString(r,key->name,key->len) == 0) return 0; } if (rioWriteBulkString(r,ele,sdslen(ele)) == 0) return 0; if (++count == AOF_REWRITE_ITEMS_PER_CMD) count = 0; @@ -1004,7 +1004,7 @@ int rewriteSetObject(rio *r, robj *key, robj *o) { /* Emit the commands needed to rebuild a sorted set object. * The function returns 0 on error, 1 on success. */ -int rewriteSortedSetObject(rio *r, robj *key, robj *o) { +int rewriteSortedSetObject(rio *r, rkey *key, robj *o) { long long count = 0, items = zsetLength(o); if (o->encoding == OBJ_ENCODING_ZIPLIST) { @@ -1030,7 +1030,7 @@ int rewriteSortedSetObject(rio *r, robj *key, robj *o) { if (rioWriteBulkCount(r,'*',2+cmd_items*2) == 0) return 0; if (rioWriteBulkString(r,"ZADD",4) == 0) return 0; - if (rioWriteBulkObject(r,key) == 0) return 0; + if (rioWriteBulkString(r,key->name,key->len) == 0) return 0; } if (rioWriteBulkDouble(r,score) == 0) return 0; if (vstr != NULL) { @@ -1057,7 +1057,7 @@ int rewriteSortedSetObject(rio *r, robj *key, robj *o) { if (rioWriteBulkCount(r,'*',2+cmd_items*2) == 0) return 0; if (rioWriteBulkString(r,"ZADD",4) == 0) return 0; - if (rioWriteBulkObject(r,key) == 0) return 0; + if (rioWriteBulkString(r,key->name,key->len) == 0) return 0; } if (rioWriteBulkDouble(r,*score) == 0) return 0; if (rioWriteBulkString(r,ele,sdslen(ele)) == 0) return 0; @@ -1099,7 +1099,7 @@ static int rioWriteHashIteratorCursor(rio *r, hashTypeIterator *hi, int what) { /* Emit the commands needed to rebuild a hash object. * The function returns 0 on error, 1 on success. */ -int rewriteHashObject(rio *r, robj *key, robj *o) { +int rewriteHashObject(rio *r, rkey *key, robj *o) { hashTypeIterator *hi; long long count = 0, items = hashTypeLength(o); @@ -1111,7 +1111,7 @@ int rewriteHashObject(rio *r, robj *key, robj *o) { if (rioWriteBulkCount(r,'*',2+cmd_items*2) == 0) return 0; if (rioWriteBulkString(r,"HMSET",5) == 0) return 0; - if (rioWriteBulkObject(r,key) == 0) return 0; + if (rioWriteBulkString(r,key->name,key->len) == 0) return 0; } if (rioWriteHashIteratorCursor(r, hi, OBJ_HASH_KEY) == 0) return 0; @@ -1140,14 +1140,14 @@ int rioWriteBulkStreamID(rio *r,streamID *id) { * add the message described by 'nack' having the id 'rawid', into the pending * list of the specified consumer. All this in the context of the specified * key and group. */ -int rioWriteStreamPendingEntry(rio *r, robj *key, const char *groupname, size_t groupname_len, streamConsumer *consumer, unsigned char *rawid, streamNACK *nack) { +int rioWriteStreamPendingEntry(rio *r, rkey *key, const char *groupname, size_t groupname_len, streamConsumer *consumer, unsigned char *rawid, streamNACK *nack) { /* XCLAIM <key> <group> <consumer> 0 <id> TIME <milliseconds-unix-time> RETRYCOUNT <count> JUSTID FORCE. */ streamID id; streamDecodeID(rawid,&id); if (rioWriteBulkCount(r,'*',12) == 0) return 0; if (rioWriteBulkString(r,"XCLAIM",6) == 0) return 0; - if (rioWriteBulkObject(r,key) == 0) return 0; + if (rioWriteBulkString(r,key->name,key->len) == 0) return 0; if (rioWriteBulkString(r,groupname,groupname_len) == 0) return 0; if (rioWriteBulkString(r,consumer->name,sdslen(consumer->name)) == 0) return 0; if (rioWriteBulkString(r,"0",1) == 0) return 0; @@ -1163,7 +1163,7 @@ int rioWriteStreamPendingEntry(rio *r, robj *key, const char *groupname, size_t /* Emit the commands needed to rebuild a stream object. * The function returns 0 on error, 1 on success. */ -int rewriteStreamObject(rio *r, robj *key, robj *o) { +int rewriteStreamObject(rio *r, rkey *key, robj *o) { stream *s = o->ptr; streamIterator si; streamIteratorStart(&si,s,NULL,NULL,0); @@ -1179,7 +1179,7 @@ int rewriteStreamObject(rio *r, robj *key, robj *o) { /* Emit the XADD <key> <id> ...fields... command. */ if (rioWriteBulkCount(r,'*',3+numfields*2) == 0) return 0; if (rioWriteBulkString(r,"XADD",4) == 0) return 0; - if (rioWriteBulkObject(r,key) == 0) return 0; + if (rioWriteBulkString(r,key->name,key->len) == 0) return 0; if (rioWriteBulkStreamID(r,&id) == 0) return 0; while(numfields--) { unsigned char *field, *value; @@ -1195,7 +1195,7 @@ int rewriteStreamObject(rio *r, robj *key, robj *o) { * for the Stream type. */ if (rioWriteBulkCount(r,'*',7) == 0) return 0; if (rioWriteBulkString(r,"XADD",4) == 0) return 0; - if (rioWriteBulkObject(r,key) == 0) return 0; + if (rioWriteBulkString(r,key->name,key->len) == 0) return 0; if (rioWriteBulkString(r,"MAXLEN",6) == 0) return 0; if (rioWriteBulkString(r,"0",1) == 0) return 0; if (rioWriteBulkStreamID(r,&s->last_id) == 0) return 0; @@ -1207,7 +1207,7 @@ int rewriteStreamObject(rio *r, robj *key, robj *o) { * in case of XDEL lastid. */ if (rioWriteBulkCount(r,'*',3) == 0) return 0; if (rioWriteBulkString(r,"XSETID",6) == 0) return 0; - if (rioWriteBulkObject(r,key) == 0) return 0; + if (rioWriteBulkString(r,key->name,key->len) == 0) return 0; if (rioWriteBulkStreamID(r,&s->last_id) == 0) return 0; @@ -1222,7 +1222,7 @@ int rewriteStreamObject(rio *r, robj *key, robj *o) { if (rioWriteBulkCount(r,'*',5) == 0) return 0; if (rioWriteBulkString(r,"XGROUP",6) == 0) return 0; if (rioWriteBulkString(r,"CREATE",6) == 0) return 0; - if (rioWriteBulkObject(r,key) == 0) return 0; + if (rioWriteBulkString(r,key->name,key->len) == 0) return 0; if (rioWriteBulkString(r,(char*)ri.key,ri.key_len) == 0) return 0; if (rioWriteBulkStreamID(r,&group->last_id) == 0) return 0; @@ -1262,12 +1262,14 @@ int rewriteStreamObject(rio *r, robj *key, robj *o) { /* Call the module type callback in order to rewrite a data type * that is exported by a module and is not handled by Redis itself. * The function returns 0 on error, 1 on success. */ -int rewriteModuleObject(rio *r, robj *key, robj *o) { +int rewriteModuleObject(rio *r, rkey *key, robj *o) { RedisModuleIO io; moduleValue *mv = o->ptr; moduleType *mt = mv->type; - moduleInitIOContext(io,mt,r,key); - mt->aof_rewrite(&io,key,mv->value); + robj *keyname = createStringObject(key->name,key->len); + moduleInitIOContext(io,mt,r,keyname); + mt->aof_rewrite(&io,keyname,mv->value); + decrRefCount(keyname); if (io.ctx) { moduleFreeContext(io.ctx); zfree(io.ctx); @@ -1309,15 +1311,14 @@ int rewriteAppendOnlyFileRio(rio *aof) { /* Iterate this DB writing every entry */ while((de = dictNext(di)) != NULL) { - sds keystr; - robj key, *o; + rkey *key; + robj *o; long long expiretime; - keystr = dictGetKey(de); + key = dictGetKey(de); o = dictGetVal(de); - initStaticStringObject(key,keystr); - expiretime = getExpire(db,&key); + expiretime = getExpire(key); /* Save the key and associated value */ if (o->type == OBJ_STRING) { @@ -1325,20 +1326,21 @@ int rewriteAppendOnlyFileRio(rio *aof) { char cmd[]="*3\r\n$3\r\nSET\r\n"; if (rioWrite(aof,cmd,sizeof(cmd)-1) == 0) goto werr; /* Key and value */ - if (rioWriteBulkObject(aof,&key) == 0) goto werr; + if (rioWriteBulkString(aof,key->name,key->len) == 0) + goto werr; if (rioWriteBulkObject(aof,o) == 0) goto werr; } else if (o->type == OBJ_LIST) { - if (rewriteListObject(aof,&key,o) == 0) goto werr; + if (rewriteListObject(aof,key,o) == 0) goto werr; } else if (o->type == OBJ_SET) { - if (rewriteSetObject(aof,&key,o) == 0) goto werr; + if (rewriteSetObject(aof,key,o) == 0) goto werr; } else if (o->type == OBJ_ZSET) { - if (rewriteSortedSetObject(aof,&key,o) == 0) goto werr; + if (rewriteSortedSetObject(aof,key,o) == 0) goto werr; } else if (o->type == OBJ_HASH) { - if (rewriteHashObject(aof,&key,o) == 0) goto werr; + if (rewriteHashObject(aof,key,o) == 0) goto werr; } else if (o->type == OBJ_STREAM) { - if (rewriteStreamObject(aof,&key,o) == 0) goto werr; + if (rewriteStreamObject(aof,key,o) == 0) goto werr; } else if (o->type == OBJ_MODULE) { - if (rewriteModuleObject(aof,&key,o) == 0) goto werr; + if (rewriteModuleObject(aof,key,o) == 0) goto werr; } else { serverPanic("Unknown object type"); } @@ -1346,7 +1348,8 @@ int rewriteAppendOnlyFileRio(rio *aof) { if (expiretime != -1) { char cmd[]="*3\r\n$9\r\nPEXPIREAT\r\n"; if (rioWrite(aof,cmd,sizeof(cmd)-1) == 0) goto werr; - if (rioWriteBulkObject(aof,&key) == 0) goto werr; + if (rioWriteBulkString(aof,key->name,key->len) == 0) + goto werr; if (rioWriteBulkLongLong(aof,expiretime) == 0) goto werr; } /* Read some diff from the parent process from time to time. */ |