summaryrefslogtreecommitdiff
path: root/src/aof.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/aof.c')
-rw-r--r--src/aof.c69
1 files changed, 36 insertions, 33 deletions
diff --git a/src/aof.c b/src/aof.c
index 4744847d2..b9ade76a4 100644
--- a/src/aof.c
+++ b/src/aof.c
@@ -923,7 +923,7 @@ int rioWriteBulkObject(rio *r, robj *obj) {
/* Emit the commands needed to rebuild a list object.
* The function returns 0 on error, 1 on success. */
-int rewriteListObject(rio *r, robj *key, robj *o) {
+int rewriteListObject(rio *r, rkey *key, robj *o) {
long long count = 0, items = listTypeLength(o);
if (o->encoding == OBJ_ENCODING_QUICKLIST) {
@@ -937,7 +937,7 @@ int rewriteListObject(rio *r, robj *key, robj *o) {
AOF_REWRITE_ITEMS_PER_CMD : items;
if (rioWriteBulkCount(r,'*',2+cmd_items) == 0) return 0;
if (rioWriteBulkString(r,"RPUSH",5) == 0) return 0;
- if (rioWriteBulkObject(r,key) == 0) return 0;
+ if (rioWriteBulkString(r,key->name,key->len) == 0) return 0;
}
if (entry.value) {
@@ -957,7 +957,7 @@ int rewriteListObject(rio *r, robj *key, robj *o) {
/* Emit the commands needed to rebuild a set object.
* The function returns 0 on error, 1 on success. */
-int rewriteSetObject(rio *r, robj *key, robj *o) {
+int rewriteSetObject(rio *r, rkey *key, robj *o) {
long long count = 0, items = setTypeSize(o);
if (o->encoding == OBJ_ENCODING_INTSET) {
@@ -971,7 +971,7 @@ int rewriteSetObject(rio *r, robj *key, robj *o) {
if (rioWriteBulkCount(r,'*',2+cmd_items) == 0) return 0;
if (rioWriteBulkString(r,"SADD",4) == 0) return 0;
- if (rioWriteBulkObject(r,key) == 0) return 0;
+ if (rioWriteBulkString(r,key->name,key->len) == 0) return 0;
}
if (rioWriteBulkLongLong(r,llval) == 0) return 0;
if (++count == AOF_REWRITE_ITEMS_PER_CMD) count = 0;
@@ -989,7 +989,7 @@ int rewriteSetObject(rio *r, robj *key, robj *o) {
if (rioWriteBulkCount(r,'*',2+cmd_items) == 0) return 0;
if (rioWriteBulkString(r,"SADD",4) == 0) return 0;
- if (rioWriteBulkObject(r,key) == 0) return 0;
+ if (rioWriteBulkString(r,key->name,key->len) == 0) return 0;
}
if (rioWriteBulkString(r,ele,sdslen(ele)) == 0) return 0;
if (++count == AOF_REWRITE_ITEMS_PER_CMD) count = 0;
@@ -1004,7 +1004,7 @@ int rewriteSetObject(rio *r, robj *key, robj *o) {
/* Emit the commands needed to rebuild a sorted set object.
* The function returns 0 on error, 1 on success. */
-int rewriteSortedSetObject(rio *r, robj *key, robj *o) {
+int rewriteSortedSetObject(rio *r, rkey *key, robj *o) {
long long count = 0, items = zsetLength(o);
if (o->encoding == OBJ_ENCODING_ZIPLIST) {
@@ -1030,7 +1030,7 @@ int rewriteSortedSetObject(rio *r, robj *key, robj *o) {
if (rioWriteBulkCount(r,'*',2+cmd_items*2) == 0) return 0;
if (rioWriteBulkString(r,"ZADD",4) == 0) return 0;
- if (rioWriteBulkObject(r,key) == 0) return 0;
+ if (rioWriteBulkString(r,key->name,key->len) == 0) return 0;
}
if (rioWriteBulkDouble(r,score) == 0) return 0;
if (vstr != NULL) {
@@ -1057,7 +1057,7 @@ int rewriteSortedSetObject(rio *r, robj *key, robj *o) {
if (rioWriteBulkCount(r,'*',2+cmd_items*2) == 0) return 0;
if (rioWriteBulkString(r,"ZADD",4) == 0) return 0;
- if (rioWriteBulkObject(r,key) == 0) return 0;
+ if (rioWriteBulkString(r,key->name,key->len) == 0) return 0;
}
if (rioWriteBulkDouble(r,*score) == 0) return 0;
if (rioWriteBulkString(r,ele,sdslen(ele)) == 0) return 0;
@@ -1099,7 +1099,7 @@ static int rioWriteHashIteratorCursor(rio *r, hashTypeIterator *hi, int what) {
/* Emit the commands needed to rebuild a hash object.
* The function returns 0 on error, 1 on success. */
-int rewriteHashObject(rio *r, robj *key, robj *o) {
+int rewriteHashObject(rio *r, rkey *key, robj *o) {
hashTypeIterator *hi;
long long count = 0, items = hashTypeLength(o);
@@ -1111,7 +1111,7 @@ int rewriteHashObject(rio *r, robj *key, robj *o) {
if (rioWriteBulkCount(r,'*',2+cmd_items*2) == 0) return 0;
if (rioWriteBulkString(r,"HMSET",5) == 0) return 0;
- if (rioWriteBulkObject(r,key) == 0) return 0;
+ if (rioWriteBulkString(r,key->name,key->len) == 0) return 0;
}
if (rioWriteHashIteratorCursor(r, hi, OBJ_HASH_KEY) == 0) return 0;
@@ -1140,14 +1140,14 @@ int rioWriteBulkStreamID(rio *r,streamID *id) {
* add the message described by 'nack' having the id 'rawid', into the pending
* list of the specified consumer. All this in the context of the specified
* key and group. */
-int rioWriteStreamPendingEntry(rio *r, robj *key, const char *groupname, size_t groupname_len, streamConsumer *consumer, unsigned char *rawid, streamNACK *nack) {
+int rioWriteStreamPendingEntry(rio *r, rkey *key, const char *groupname, size_t groupname_len, streamConsumer *consumer, unsigned char *rawid, streamNACK *nack) {
/* XCLAIM <key> <group> <consumer> 0 <id> TIME <milliseconds-unix-time>
RETRYCOUNT <count> JUSTID FORCE. */
streamID id;
streamDecodeID(rawid,&id);
if (rioWriteBulkCount(r,'*',12) == 0) return 0;
if (rioWriteBulkString(r,"XCLAIM",6) == 0) return 0;
- if (rioWriteBulkObject(r,key) == 0) return 0;
+ if (rioWriteBulkString(r,key->name,key->len) == 0) return 0;
if (rioWriteBulkString(r,groupname,groupname_len) == 0) return 0;
if (rioWriteBulkString(r,consumer->name,sdslen(consumer->name)) == 0) return 0;
if (rioWriteBulkString(r,"0",1) == 0) return 0;
@@ -1163,7 +1163,7 @@ int rioWriteStreamPendingEntry(rio *r, robj *key, const char *groupname, size_t
/* Emit the commands needed to rebuild a stream object.
* The function returns 0 on error, 1 on success. */
-int rewriteStreamObject(rio *r, robj *key, robj *o) {
+int rewriteStreamObject(rio *r, rkey *key, robj *o) {
stream *s = o->ptr;
streamIterator si;
streamIteratorStart(&si,s,NULL,NULL,0);
@@ -1179,7 +1179,7 @@ int rewriteStreamObject(rio *r, robj *key, robj *o) {
/* Emit the XADD <key> <id> ...fields... command. */
if (rioWriteBulkCount(r,'*',3+numfields*2) == 0) return 0;
if (rioWriteBulkString(r,"XADD",4) == 0) return 0;
- if (rioWriteBulkObject(r,key) == 0) return 0;
+ if (rioWriteBulkString(r,key->name,key->len) == 0) return 0;
if (rioWriteBulkStreamID(r,&id) == 0) return 0;
while(numfields--) {
unsigned char *field, *value;
@@ -1195,7 +1195,7 @@ int rewriteStreamObject(rio *r, robj *key, robj *o) {
* for the Stream type. */
if (rioWriteBulkCount(r,'*',7) == 0) return 0;
if (rioWriteBulkString(r,"XADD",4) == 0) return 0;
- if (rioWriteBulkObject(r,key) == 0) return 0;
+ if (rioWriteBulkString(r,key->name,key->len) == 0) return 0;
if (rioWriteBulkString(r,"MAXLEN",6) == 0) return 0;
if (rioWriteBulkString(r,"0",1) == 0) return 0;
if (rioWriteBulkStreamID(r,&s->last_id) == 0) return 0;
@@ -1207,7 +1207,7 @@ int rewriteStreamObject(rio *r, robj *key, robj *o) {
* in case of XDEL lastid. */
if (rioWriteBulkCount(r,'*',3) == 0) return 0;
if (rioWriteBulkString(r,"XSETID",6) == 0) return 0;
- if (rioWriteBulkObject(r,key) == 0) return 0;
+ if (rioWriteBulkString(r,key->name,key->len) == 0) return 0;
if (rioWriteBulkStreamID(r,&s->last_id) == 0) return 0;
@@ -1222,7 +1222,7 @@ int rewriteStreamObject(rio *r, robj *key, robj *o) {
if (rioWriteBulkCount(r,'*',5) == 0) return 0;
if (rioWriteBulkString(r,"XGROUP",6) == 0) return 0;
if (rioWriteBulkString(r,"CREATE",6) == 0) return 0;
- if (rioWriteBulkObject(r,key) == 0) return 0;
+ if (rioWriteBulkString(r,key->name,key->len) == 0) return 0;
if (rioWriteBulkString(r,(char*)ri.key,ri.key_len) == 0) return 0;
if (rioWriteBulkStreamID(r,&group->last_id) == 0) return 0;
@@ -1262,12 +1262,14 @@ int rewriteStreamObject(rio *r, robj *key, robj *o) {
/* Call the module type callback in order to rewrite a data type
* that is exported by a module and is not handled by Redis itself.
* The function returns 0 on error, 1 on success. */
-int rewriteModuleObject(rio *r, robj *key, robj *o) {
+int rewriteModuleObject(rio *r, rkey *key, robj *o) {
RedisModuleIO io;
moduleValue *mv = o->ptr;
moduleType *mt = mv->type;
- moduleInitIOContext(io,mt,r,key);
- mt->aof_rewrite(&io,key,mv->value);
+ robj *keyname = createStringObject(key->name,key->len);
+ moduleInitIOContext(io,mt,r,keyname);
+ mt->aof_rewrite(&io,keyname,mv->value);
+ decrRefCount(keyname);
if (io.ctx) {
moduleFreeContext(io.ctx);
zfree(io.ctx);
@@ -1309,15 +1311,14 @@ int rewriteAppendOnlyFileRio(rio *aof) {
/* Iterate this DB writing every entry */
while((de = dictNext(di)) != NULL) {
- sds keystr;
- robj key, *o;
+ rkey *key;
+ robj *o;
long long expiretime;
- keystr = dictGetKey(de);
+ key = dictGetKey(de);
o = dictGetVal(de);
- initStaticStringObject(key,keystr);
- expiretime = getExpire(db,&key);
+ expiretime = getExpire(key);
/* Save the key and associated value */
if (o->type == OBJ_STRING) {
@@ -1325,20 +1326,21 @@ int rewriteAppendOnlyFileRio(rio *aof) {
char cmd[]="*3\r\n$3\r\nSET\r\n";
if (rioWrite(aof,cmd,sizeof(cmd)-1) == 0) goto werr;
/* Key and value */
- if (rioWriteBulkObject(aof,&key) == 0) goto werr;
+ if (rioWriteBulkString(aof,key->name,key->len) == 0)
+ goto werr;
if (rioWriteBulkObject(aof,o) == 0) goto werr;
} else if (o->type == OBJ_LIST) {
- if (rewriteListObject(aof,&key,o) == 0) goto werr;
+ if (rewriteListObject(aof,key,o) == 0) goto werr;
} else if (o->type == OBJ_SET) {
- if (rewriteSetObject(aof,&key,o) == 0) goto werr;
+ if (rewriteSetObject(aof,key,o) == 0) goto werr;
} else if (o->type == OBJ_ZSET) {
- if (rewriteSortedSetObject(aof,&key,o) == 0) goto werr;
+ if (rewriteSortedSetObject(aof,key,o) == 0) goto werr;
} else if (o->type == OBJ_HASH) {
- if (rewriteHashObject(aof,&key,o) == 0) goto werr;
+ if (rewriteHashObject(aof,key,o) == 0) goto werr;
} else if (o->type == OBJ_STREAM) {
- if (rewriteStreamObject(aof,&key,o) == 0) goto werr;
+ if (rewriteStreamObject(aof,key,o) == 0) goto werr;
} else if (o->type == OBJ_MODULE) {
- if (rewriteModuleObject(aof,&key,o) == 0) goto werr;
+ if (rewriteModuleObject(aof,key,o) == 0) goto werr;
} else {
serverPanic("Unknown object type");
}
@@ -1346,7 +1348,8 @@ int rewriteAppendOnlyFileRio(rio *aof) {
if (expiretime != -1) {
char cmd[]="*3\r\n$9\r\nPEXPIREAT\r\n";
if (rioWrite(aof,cmd,sizeof(cmd)-1) == 0) goto werr;
- if (rioWriteBulkObject(aof,&key) == 0) goto werr;
+ if (rioWriteBulkString(aof,key->name,key->len) == 0)
+ goto werr;
if (rioWriteBulkLongLong(aof,expiretime) == 0) goto werr;
}
/* Read some diff from the parent process from time to time. */