summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorantirez <antirez@gmail.com>2016-09-09 15:03:21 +0200
committerantirez <antirez@gmail.com>2016-09-09 15:03:21 +0200
commit3793afa0ba4a214c1e25ed74309d0594e7a1490c (patch)
tree971d8e50dc401692ebbf2bf72518d984fa8aec8e
parentf9624813aff019f95e370524152d130d22ee37f3 (diff)
parentdacb69ed007b9b1a0767cac420904b7af020f9e4 (diff)
downloadredis-3793afa0ba4a214c1e25ed74309d0594e7a1490c.tar.gz
Merge branch 'aofrdb' into unstable
-rw-r--r--redis.conf14
-rw-r--r--src/aof.c149
-rw-r--r--src/config.c9
-rw-r--r--src/rdb.c90
-rw-r--r--src/rdb.h4
-rw-r--r--src/server.c1
-rw-r--r--src/server.h5
-rw-r--r--tests/unit/aofrw.tcl101
8 files changed, 236 insertions, 137 deletions
diff --git a/redis.conf b/redis.conf
index b9217fdb4..a7b7f3a97 100644
--- a/redis.conf
+++ b/redis.conf
@@ -755,6 +755,20 @@ auto-aof-rewrite-min-size 64mb
# will be found.
aof-load-truncated yes
+# When rewriting the AOF file, Redis is able to use an RDB preamble in the
+# AOF file for faster rewrites and recoveries. When this option is turned
+# on the rewritten AOF file is composed of two different stanzas:
+#
+# [RDB file][AOF tail]
+#
+# When loading Redis recognizes that the AOF file starts with the "REDIS"
+# string and loads the prefixed RDB file, and continues loading the AOF
+# tail.
+#
+# This is currently turned off by default in order to avoid the surprise
+# of a format change, but will at some point be used as the default.
+aof-use-rdb-preamble no
+
################################ LUA SCRIPTING ###############################
# Max execution time of a Lua script in milliseconds.
diff --git a/src/aof.c b/src/aof.c
index 6a92a0cd9..5523066b5 100644
--- a/src/aof.c
+++ b/src/aof.c
@@ -616,19 +616,23 @@ int loadAppendOnlyFile(char *filename) {
struct redis_stat sb;
int old_aof_state = server.aof_state;
long loops = 0;
- off_t valid_up_to = 0; /* Offset of the latest well-formed command loaded. */
+ off_t valid_up_to = 0; /* Offset of latest well-formed command loaded. */
+ if (fp == NULL) {
+ serverLog(LL_WARNING,"Fatal error: can't open the append log file for reading: %s",strerror(errno));
+ exit(1);
+ }
+
+ /* Handle a zero-length AOF file as a special case. An emtpy AOF file
+ * is a valid AOF because an empty server with AOF enabled will create
+ * a zero length file at startup, that will remain like that if no write
+ * operation is received. */
if (fp && redis_fstat(fileno(fp),&sb) != -1 && sb.st_size == 0) {
server.aof_current_size = 0;
fclose(fp);
return C_ERR;
}
- if (fp == NULL) {
- serverLog(LL_WARNING,"Fatal error: can't open the append log file for reading: %s",strerror(errno));
- exit(1);
- }
-
/* Temporarily disable AOF, to prevent EXEC from feeding a MULTI
* to the same file we're about to read. */
server.aof_state = AOF_OFF;
@@ -636,6 +640,28 @@ int loadAppendOnlyFile(char *filename) {
fakeClient = createFakeClient();
startLoading(fp);
+ /* Check if this AOF file has an RDB preamble. In that case we need to
+ * load the RDB file and later continue loading the AOF tail. */
+ char sig[5]; /* "REDIS" */
+ if (fread(sig,1,5,fp) != 5 || memcmp(sig,"REDIS",5) != 0) {
+ /* No RDB preamble, seek back at 0 offset. */
+ if (fseek(fp,0,SEEK_SET) == -1) goto readerr;
+ } else {
+ /* RDB preamble. Pass loading the RDB functions. */
+ rio rdb;
+
+ serverLog(LL_NOTICE,"Reading RDB preamble from AOF file...");
+ if (fseek(fp,0,SEEK_SET) == -1) goto readerr;
+ rioInitWithFile(&rdb,fp);
+ if (rdbLoadRio(&rdb) != C_OK) {
+ serverLog(LL_WARNING,"Error reading the RDB preamble of the AOF file, AOF loading aborted");
+ goto readerr;
+ } else {
+ serverLog(LL_NOTICE,"Reading the remaining AOF tail...");
+ }
+ }
+
+ /* Read the actual AOF file, in REPL format, command by command. */
while(1) {
int argc, j;
unsigned long len;
@@ -989,7 +1015,7 @@ int rewriteHashObject(rio *r, robj *key, robj *o) {
}
/* Call the module type callback in order to rewrite a data type
- * taht is exported by a module and is not handled by Redis itself.
+ * that is exported by a module and is not handled by Redis itself.
* The function returns 0 on error, 1 on success. */
int rewriteModuleObject(rio *r, robj *key, robj *o) {
RedisModuleIO io;
@@ -1015,51 +1041,23 @@ ssize_t aofReadDiffFromParent(void) {
return total;
}
-/* Write a sequence of commands able to fully rebuild the dataset into
- * "filename". Used both by REWRITEAOF and BGREWRITEAOF.
- *
- * In order to minimize the number of commands needed in the rewritten
- * log Redis uses variadic commands when possible, such as RPUSH, SADD
- * and ZADD. However at max AOF_REWRITE_ITEMS_PER_CMD items per time
- * are inserted using a single command. */
-int rewriteAppendOnlyFile(char *filename) {
+int rewriteAppendOnlyFileRio(rio *aof) {
dictIterator *di = NULL;
dictEntry *de;
- rio aof;
- FILE *fp;
- char tmpfile[256];
- int j;
- long long now = mstime();
- char byte;
size_t processed = 0;
+ long long now = mstime();
+ int j;
- /* Note that we have to use a different temp name here compared to the
- * one used by rewriteAppendOnlyFileBackground() function. */
- snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid());
- fp = fopen(tmpfile,"w");
- if (!fp) {
- serverLog(LL_WARNING, "Opening the temp file for AOF rewrite in rewriteAppendOnlyFile(): %s", strerror(errno));
- return C_ERR;
- }
-
- server.aof_child_diff = sdsempty();
- rioInitWithFile(&aof,fp);
- if (server.aof_rewrite_incremental_fsync)
- rioSetAutoSync(&aof,AOF_AUTOSYNC_BYTES);
for (j = 0; j < server.dbnum; j++) {
char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n";
redisDb *db = server.db+j;
dict *d = db->dict;
if (dictSize(d) == 0) continue;
di = dictGetSafeIterator(d);
- if (!di) {
- fclose(fp);
- return C_ERR;
- }
/* SELECT the new DB */
- if (rioWrite(&aof,selectcmd,sizeof(selectcmd)-1) == 0) goto werr;
- if (rioWriteBulkLongLong(&aof,j) == 0) goto werr;
+ if (rioWrite(aof,selectcmd,sizeof(selectcmd)-1) == 0) goto werr;
+ if (rioWriteBulkLongLong(aof,j) == 0) goto werr;
/* Iterate this DB writing every entry */
while((de = dictNext(di)) != NULL) {
@@ -1080,39 +1078,83 @@ int rewriteAppendOnlyFile(char *filename) {
if (o->type == OBJ_STRING) {
/* Emit a SET command */
char cmd[]="*3\r\n$3\r\nSET\r\n";
- if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr;
+ if (rioWrite(aof,cmd,sizeof(cmd)-1) == 0) goto werr;
/* Key and value */
- if (rioWriteBulkObject(&aof,&key) == 0) goto werr;
- if (rioWriteBulkObject(&aof,o) == 0) goto werr;
+ if (rioWriteBulkObject(aof,&key) == 0) goto werr;
+ if (rioWriteBulkObject(aof,o) == 0) goto werr;
} else if (o->type == OBJ_LIST) {
- if (rewriteListObject(&aof,&key,o) == 0) goto werr;
+ if (rewriteListObject(aof,&key,o) == 0) goto werr;
} else if (o->type == OBJ_SET) {
- if (rewriteSetObject(&aof,&key,o) == 0) goto werr;
+ if (rewriteSetObject(aof,&key,o) == 0) goto werr;
} else if (o->type == OBJ_ZSET) {
- if (rewriteSortedSetObject(&aof,&key,o) == 0) goto werr;
+ if (rewriteSortedSetObject(aof,&key,o) == 0) goto werr;
} else if (o->type == OBJ_HASH) {
- if (rewriteHashObject(&aof,&key,o) == 0) goto werr;
+ if (rewriteHashObject(aof,&key,o) == 0) goto werr;
} else if (o->type == OBJ_MODULE) {
- if (rewriteModuleObject(&aof,&key,o) == 0) goto werr;
+ if (rewriteModuleObject(aof,&key,o) == 0) goto werr;
} else {
serverPanic("Unknown object type");
}
/* Save the expire time */
if (expiretime != -1) {
char cmd[]="*3\r\n$9\r\nPEXPIREAT\r\n";
- if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr;
- if (rioWriteBulkObject(&aof,&key) == 0) goto werr;
- if (rioWriteBulkLongLong(&aof,expiretime) == 0) goto werr;
+ if (rioWrite(aof,cmd,sizeof(cmd)-1) == 0) goto werr;
+ if (rioWriteBulkObject(aof,&key) == 0) goto werr;
+ if (rioWriteBulkLongLong(aof,expiretime) == 0) goto werr;
}
/* Read some diff from the parent process from time to time. */
- if (aof.processed_bytes > processed+1024*10) {
- processed = aof.processed_bytes;
+ if (aof->processed_bytes > processed+AOF_READ_DIFF_INTERVAL_BYTES) {
+ processed = aof->processed_bytes;
aofReadDiffFromParent();
}
}
dictReleaseIterator(di);
di = NULL;
}
+ return C_OK;
+
+werr:
+ if (di) dictReleaseIterator(di);
+ return C_ERR;
+}
+
+/* Write a sequence of commands able to fully rebuild the dataset into
+ * "filename". Used both by REWRITEAOF and BGREWRITEAOF.
+ *
+ * In order to minimize the number of commands needed in the rewritten
+ * log Redis uses variadic commands when possible, such as RPUSH, SADD
+ * and ZADD. However at max AOF_REWRITE_ITEMS_PER_CMD items per time
+ * are inserted using a single command. */
+int rewriteAppendOnlyFile(char *filename) {
+ rio aof;
+ FILE *fp;
+ char tmpfile[256];
+ char byte;
+
+ /* Note that we have to use a different temp name here compared to the
+ * one used by rewriteAppendOnlyFileBackground() function. */
+ snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid());
+ fp = fopen(tmpfile,"w");
+ if (!fp) {
+ serverLog(LL_WARNING, "Opening the temp file for AOF rewrite in rewriteAppendOnlyFile(): %s", strerror(errno));
+ return C_ERR;
+ }
+
+ server.aof_child_diff = sdsempty();
+ rioInitWithFile(&aof,fp);
+
+ if (server.aof_rewrite_incremental_fsync)
+ rioSetAutoSync(&aof,AOF_AUTOSYNC_BYTES);
+
+ if (server.aof_use_rdb_preamble) {
+ int error;
+ if (rdbSaveRio(&aof,&error,RDB_SAVE_AOF_PREAMBLE) == C_ERR) {
+ errno = error;
+ goto werr;
+ }
+ } else {
+ if (rewriteAppendOnlyFileRio(&aof) == C_ERR) goto werr;
+ }
/* Do an initial slow fsync here while the parent is still sending
* data, in order to make the next final fsync faster. */
@@ -1178,7 +1220,6 @@ werr:
serverLog(LL_WARNING,"Write error writing append only file on disk: %s", strerror(errno));
fclose(fp);
unlink(tmpfile);
- if (di) dictReleaseIterator(di);
return C_ERR;
}
diff --git a/src/config.c b/src/config.c
index dd21a0aca..1d81180b7 100644
--- a/src/config.c
+++ b/src/config.c
@@ -475,6 +475,10 @@ void loadServerConfigFromString(char *config) {
if ((server.aof_load_truncated = yesnotoi(argv[1])) == -1) {
err = "argument must be 'yes' or 'no'"; goto loaderr;
}
+ } else if (!strcasecmp(argv[0],"aof-use-rdb-preamble") && argc == 2) {
+ if ((server.aof_use_rdb_preamble = yesnotoi(argv[1])) == -1) {
+ err = "argument must be 'yes' or 'no'"; goto loaderr;
+ }
} else if (!strcasecmp(argv[0],"requirepass") && argc == 2) {
if (strlen(argv[1]) > CONFIG_AUTHPASS_MAX_LEN) {
err = "Password is longer than CONFIG_AUTHPASS_MAX_LEN";
@@ -954,6 +958,8 @@ void configSetCommand(client *c) {
} config_set_bool_field(
"aof-load-truncated",server.aof_load_truncated) {
} config_set_bool_field(
+ "aof-use-rdb-preamble",server.aof_use_rdb_preamble) {
+ } config_set_bool_field(
"slave-serve-stale-data",server.repl_serve_stale_data) {
} config_set_bool_field(
"slave-read-only",server.repl_slave_ro) {
@@ -1227,6 +1233,8 @@ void configGetCommand(client *c) {
server.aof_rewrite_incremental_fsync);
config_get_bool_field("aof-load-truncated",
server.aof_load_truncated);
+ config_get_bool_field("aof-use-rdb-preamble",
+ server.aof_use_rdb_preamble);
config_get_bool_field("lazyfree-lazy-eviction",
server.lazyfree_lazy_eviction);
config_get_bool_field("lazyfree-lazy-expire",
@@ -1947,6 +1955,7 @@ int rewriteConfig(char *path) {
rewriteConfigNumericalOption(state,"hz",server.hz,CONFIG_DEFAULT_HZ);
rewriteConfigYesNoOption(state,"aof-rewrite-incremental-fsync",server.aof_rewrite_incremental_fsync,CONFIG_DEFAULT_AOF_REWRITE_INCREMENTAL_FSYNC);
rewriteConfigYesNoOption(state,"aof-load-truncated",server.aof_load_truncated,CONFIG_DEFAULT_AOF_LOAD_TRUNCATED);
+ rewriteConfigYesNoOption(state,"aof-use-rdb-preamble",server.aof_use_rdb_preamble,CONFIG_DEFAULT_AOF_USE_RDB_PREAMBLE);
rewriteConfigEnumOption(state,"supervised",server.supervised_mode,supervised_mode_enum,SUPERVISED_NONE);
rewriteConfigYesNoOption(state,"lazyfree-lazy-eviction",server.lazyfree_lazy_eviction,CONFIG_DEFAULT_LAZYFREE_LAZY_EVICTION);
rewriteConfigYesNoOption(state,"lazyfree-lazy-expire",server.lazyfree_lazy_expire,CONFIG_DEFAULT_LAZYFREE_LAZY_EXPIRE);
diff --git a/src/rdb.c b/src/rdb.c
index 58cde1f28..0cda23c5d 100644
--- a/src/rdb.c
+++ b/src/rdb.c
@@ -818,14 +818,16 @@ int rdbSaveAuxFieldStrInt(rio *rdb, char *key, long long val) {
}
/* Save a few default AUX fields with information about the RDB generated. */
-int rdbSaveInfoAuxFields(rio *rdb) {
+int rdbSaveInfoAuxFields(rio *rdb, int flags) {
int redis_bits = (sizeof(void*) == 8) ? 64 : 32;
+ int aof_preamble = (flags & RDB_SAVE_AOF_PREAMBLE) != 0;
/* Add a few fields about the state when the RDB was created. */
if (rdbSaveAuxFieldStrStr(rdb,"redis-ver",REDIS_VERSION) == -1) return -1;
if (rdbSaveAuxFieldStrInt(rdb,"redis-bits",redis_bits) == -1) return -1;
if (rdbSaveAuxFieldStrInt(rdb,"ctime",time(NULL)) == -1) return -1;
if (rdbSaveAuxFieldStrInt(rdb,"used-mem",zmalloc_used_memory()) == -1) return -1;
+ if (rdbSaveAuxFieldStrInt(rdb,"aof-preamble",aof_preamble) == -1) return -1;
return 1;
}
@@ -837,19 +839,20 @@ int rdbSaveInfoAuxFields(rio *rdb) {
* When the function returns C_ERR and if 'error' is not NULL, the
* integer pointed by 'error' is set to the value of errno just after the I/O
* error. */
-int rdbSaveRio(rio *rdb, int *error) {
+int rdbSaveRio(rio *rdb, int *error, int flags) {
dictIterator *di = NULL;
dictEntry *de;
char magic[10];
int j;
long long now = mstime();
uint64_t cksum;
+ size_t processed = 0;
if (server.rdb_checksum)
rdb->update_cksum = rioGenericUpdateChecksum;
snprintf(magic,sizeof(magic),"REDIS%04d",RDB_VERSION);
if (rdbWriteRaw(rdb,magic,9) == -1) goto werr;
- if (rdbSaveInfoAuxFields(rdb) == -1) goto werr;
+ if (rdbSaveInfoAuxFields(rdb,flags) == -1) goto werr;
for (j = 0; j < server.dbnum; j++) {
redisDb *db = server.db+j;
@@ -886,6 +889,16 @@ int rdbSaveRio(rio *rdb, int *error) {
initStaticStringObject(key,keystr);
expire = getExpire(db,&key);
if (rdbSaveKeyValuePair(rdb,&key,o,expire,now) == -1) goto werr;
+
+ /* When this RDB is produced as part of an AOF rewrite, move
+ * accumulated diff from parent to child while rewriting in
+ * order to have a smaller final write. */
+ if (flags & RDB_SAVE_AOF_PREAMBLE &&
+ rdb->processed_bytes > processed+AOF_READ_DIFF_INTERVAL_BYTES)
+ {
+ processed = rdb->processed_bytes;
+ aofReadDiffFromParent();
+ }
}
dictReleaseIterator(di);
}
@@ -923,7 +936,7 @@ int rdbSaveRioWithEOFMark(rio *rdb, int *error) {
if (rioWrite(rdb,"$EOF:",5) == 0) goto werr;
if (rioWrite(rdb,eofmark,RDB_EOF_MARK_SIZE) == 0) goto werr;
if (rioWrite(rdb,"\r\n",2) == 0) goto werr;
- if (rdbSaveRio(rdb,error) == C_ERR) goto werr;
+ if (rdbSaveRio(rdb,error,RDB_SAVE_NONE) == C_ERR) goto werr;
if (rioWrite(rdb,eofmark,RDB_EOF_MARK_SIZE) == 0) goto werr;
return C_OK;
@@ -955,7 +968,7 @@ int rdbSave(char *filename) {
}
rioInitWithFile(&rdb,fp);
- if (rdbSaveRio(&rdb,&error) == C_ERR) {
+ if (rdbSaveRio(&rdb,&error,RDB_SAVE_NONE) == C_ERR) {
errno = error;
goto werr;
}
@@ -1373,67 +1386,61 @@ void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) {
}
}
-int rdbLoad(char *filename) {
+/* Load an RDB file from the rio stream 'rdb'. On success C_OK is returned,
+ * otherwise C_ERR is returned and 'errno' is set accordingly. */
+int rdbLoadRio(rio *rdb) {
uint64_t dbid;
int type, rdbver;
redisDb *db = server.db+0;
char buf[1024];
long long expiretime, now = mstime();
- FILE *fp;
- rio rdb;
- if ((fp = fopen(filename,"r")) == NULL) return C_ERR;
-
- rioInitWithFile(&rdb,fp);
- rdb.update_cksum = rdbLoadProgressCallback;
- rdb.max_processing_chunk = server.loading_process_events_interval_bytes;
- if (rioRead(&rdb,buf,9) == 0) goto eoferr;
+ rdb->update_cksum = rdbLoadProgressCallback;
+ rdb->max_processing_chunk = server.loading_process_events_interval_bytes;
+ if (rioRead(rdb,buf,9) == 0) goto eoferr;
buf[9] = '\0';
if (memcmp(buf,"REDIS",5) != 0) {
- fclose(fp);
serverLog(LL_WARNING,"Wrong signature trying to load DB from file");
errno = EINVAL;
return C_ERR;
}
rdbver = atoi(buf+5);
if (rdbver < 1 || rdbver > RDB_VERSION) {
- fclose(fp);
serverLog(LL_WARNING,"Can't handle RDB format version %d",rdbver);
errno = EINVAL;
return C_ERR;
}
- startLoading(fp);
while(1) {
robj *key, *val;
expiretime = -1;
/* Read type. */
- if ((type = rdbLoadType(&rdb)) == -1) goto eoferr;
+ if ((type = rdbLoadType(rdb)) == -1) goto eoferr;
/* Handle special types. */
if (type == RDB_OPCODE_EXPIRETIME) {
/* EXPIRETIME: load an expire associated with the next key
* to load. Note that after loading an expire we need to
* load the actual type, and continue. */
- if ((expiretime = rdbLoadTime(&rdb)) == -1) goto eoferr;
+ if ((expiretime = rdbLoadTime(rdb)) == -1) goto eoferr;
/* We read the time so we need to read the object type again. */
- if ((type = rdbLoadType(&rdb)) == -1) goto eoferr;
+ if ((type = rdbLoadType(rdb)) == -1) goto eoferr;
/* the EXPIRETIME opcode specifies time in seconds, so convert
* into milliseconds. */
expiretime *= 1000;
} else if (type == RDB_OPCODE_EXPIRETIME_MS) {
/* EXPIRETIME_MS: milliseconds precision expire times introduced
* with RDB v3. Like EXPIRETIME but no with more precision. */
- if ((expiretime = rdbLoadMillisecondTime(&rdb)) == -1) goto eoferr;
+ if ((expiretime = rdbLoadMillisecondTime(rdb)) == -1) goto eoferr;
/* We read the time so we need to read the object type again. */
- if ((type = rdbLoadType(&rdb)) == -1) goto eoferr;
+ if ((type = rdbLoadType(rdb)) == -1) goto eoferr;
} else if (type == RDB_OPCODE_EOF) {
/* EOF: End of file, exit the main loop. */
break;
} else if (type == RDB_OPCODE_SELECTDB) {
/* SELECTDB: Select the specified database. */
- if ((dbid = rdbLoadLen(&rdb,NULL)) == RDB_LENERR)
+ if ((dbid = rdbLoadLen(rdb,NULL)) == RDB_LENERR)
goto eoferr;
if (dbid >= (unsigned)server.dbnum) {
serverLog(LL_WARNING,
@@ -1448,9 +1455,9 @@ int rdbLoad(char *filename) {
/* RESIZEDB: Hint about the size of the keys in the currently
* selected data base, in order to avoid useless rehashing. */
uint64_t db_size, expires_size;
- if ((db_size = rdbLoadLen(&rdb,NULL)) == RDB_LENERR)
+ if ((db_size = rdbLoadLen(rdb,NULL)) == RDB_LENERR)
goto eoferr;
- if ((expires_size = rdbLoadLen(&rdb,NULL)) == RDB_LENERR)
+ if ((expires_size = rdbLoadLen(rdb,NULL)) == RDB_LENERR)
goto eoferr;
dictExpand(db->dict,db_size);
dictExpand(db->expires,expires_size);
@@ -1462,8 +1469,8 @@ int rdbLoad(char *filename) {
*
* An AUX field is composed of two strings: key and value. */
robj *auxkey, *auxval;
- if ((auxkey = rdbLoadStringObject(&rdb)) == NULL) goto eoferr;
- if ((auxval = rdbLoadStringObject(&rdb)) == NULL) goto eoferr;
+ if ((auxkey = rdbLoadStringObject(rdb)) == NULL) goto eoferr;
+ if ((auxval = rdbLoadStringObject(rdb)) == NULL) goto eoferr;
if (((char*)auxkey->ptr)[0] == '%') {
/* All the fields with a name staring with '%' are considered
@@ -1485,9 +1492,9 @@ int rdbLoad(char *filename) {
}
/* Read key */
- if ((key = rdbLoadStringObject(&rdb)) == NULL) goto eoferr;
+ if ((key = rdbLoadStringObject(rdb)) == NULL) goto eoferr;
/* Read value */
- if ((val = rdbLoadObject(type,&rdb)) == NULL) goto eoferr;
+ if ((val = rdbLoadObject(type,rdb)) == NULL) goto eoferr;
/* Check if the key already expired. This function is used when loading
* an RDB file from disk, either at startup, or when an RDB was
* received from the master. In the latter case, the master is
@@ -1508,9 +1515,9 @@ int rdbLoad(char *filename) {
}
/* Verify the checksum if RDB version is >= 5 */
if (rdbver >= 5 && server.rdb_checksum) {
- uint64_t cksum, expected = rdb.cksum;
+ uint64_t cksum, expected = rdb->cksum;
- if (rioRead(&rdb,&cksum,8) == 0) goto eoferr;
+ if (rioRead(rdb,&cksum,8) == 0) goto eoferr;
memrev64ifbe(&cksum);
if (cksum == 0) {
serverLog(LL_WARNING,"RDB file was saved with checksum disabled: no check performed.");
@@ -1519,9 +1526,6 @@ int rdbLoad(char *filename) {
rdbExitReportCorruptRDB("RDB CRC error");
}
}
-
- fclose(fp);
- stopLoading();
return C_OK;
eoferr: /* unexpected end of file is handled here with a fatal exit */
@@ -1530,6 +1534,24 @@ eoferr: /* unexpected end of file is handled here with a fatal exit */
return C_ERR; /* Just to avoid warning */
}
+/* Like rdbLoadRio() but takes a filename instead of a rio stream. The
+ * filename is open for reading and a rio stream object created in order
+ * to do the actual loading. Moreover the ETA displayed in the INFO
+ * output is initialized and finalized. */
+int rdbLoad(char *filename) {
+ FILE *fp;
+ rio rdb;
+ int retval;
+
+ if ((fp = fopen(filename,"r")) == NULL) return C_ERR;
+ startLoading(fp);
+ rioInitWithFile(&rdb,fp);
+ retval = rdbLoadRio(&rdb);
+ fclose(fp);
+ stopLoading();
+ return retval;
+}
+
/* A background saving child (BGSAVE) terminated its work. Handle this.
* This function covers the case of actual BGSAVEs. */
void backgroundSaveDoneHandlerDisk(int exitcode, int bysignal) {
diff --git a/src/rdb.h b/src/rdb.h
index a71ecb16e..cd1d65392 100644
--- a/src/rdb.h
+++ b/src/rdb.h
@@ -106,6 +106,9 @@
#define RDB_LOAD_PLAIN (1<<1)
#define RDB_LOAD_SDS (1<<2)
+#define RDB_SAVE_NONE 0
+#define RDB_SAVE_AOF_PREAMBLE (1<<0)
+
int rdbSaveType(rio *rdb, unsigned char type);
int rdbLoadType(rio *rdb);
int rdbSaveTime(rio *rdb, time_t t);
@@ -131,5 +134,6 @@ ssize_t rdbSaveRawString(rio *rdb, unsigned char *s, size_t len);
void *rdbGenericLoadStringObject(rio *rdb, int flags, size_t *lenptr);
int rdbSaveBinaryDoubleValue(rio *rdb, double val);
int rdbLoadBinaryDoubleValue(rio *rdb, double *val);
+int rdbLoadRio(rio *rdb);
#endif
diff --git a/src/server.c b/src/server.c
index c737a470f..e794ad132 100644
--- a/src/server.c
+++ b/src/server.c
@@ -1345,6 +1345,7 @@ void initServerConfig(void) {
server.aof_flush_postponed_start = 0;
server.aof_rewrite_incremental_fsync = CONFIG_DEFAULT_AOF_REWRITE_INCREMENTAL_FSYNC;
server.aof_load_truncated = CONFIG_DEFAULT_AOF_LOAD_TRUNCATED;
+ server.aof_use_rdb_preamble = CONFIG_DEFAULT_AOF_USE_RDB_PREAMBLE;
server.pidfile = NULL;
server.rdb_filename = zstrdup(CONFIG_DEFAULT_RDB_FILENAME);
server.aof_filename = zstrdup(CONFIG_DEFAULT_AOF_FILENAME);
diff --git a/src/server.h b/src/server.h
index d410d5b2a..a5f0ee1a6 100644
--- a/src/server.h
+++ b/src/server.h
@@ -93,6 +93,7 @@ typedef long long mstime_t; /* millisecond time type. */
#define AOF_REWRITE_PERC 100
#define AOF_REWRITE_MIN_SIZE (64*1024*1024)
#define AOF_REWRITE_ITEMS_PER_CMD 64
+#define AOF_READ_DIFF_INTERVAL_BYTES (1024*10)
#define CONFIG_DEFAULT_SLOWLOG_LOG_SLOWER_THAN 10000
#define CONFIG_DEFAULT_SLOWLOG_MAX_LEN 128
#define CONFIG_DEFAULT_MAX_CLIENTS 10000
@@ -136,6 +137,7 @@ typedef long long mstime_t; /* millisecond time type. */
#define CONFIG_DEFAULT_AOF_FILENAME "appendonly.aof"
#define CONFIG_DEFAULT_AOF_NO_FSYNC_ON_REWRITE 0
#define CONFIG_DEFAULT_AOF_LOAD_TRUNCATED 1
+#define CONFIG_DEFAULT_AOF_USE_RDB_PREAMBLE 0
#define CONFIG_DEFAULT_ACTIVE_REHASHING 1
#define CONFIG_DEFAULT_AOF_REWRITE_INCREMENTAL_FSYNC 1
#define CONFIG_DEFAULT_MIN_SLAVES_TO_WRITE 0
@@ -900,6 +902,7 @@ struct redisServer {
int aof_last_write_status; /* C_OK or C_ERR */
int aof_last_write_errno; /* Valid if aof_last_write_status is ERR */
int aof_load_truncated; /* Don't stop on unexpected AOF EOF. */
+ int aof_use_rdb_preamble; /* Use RDB preamble on AOF rewrites. */
/* AOF pipes used to communicate between parent and child during rewrite. */
int aof_pipe_write_data_to_child;
int aof_pipe_read_data_from_parent;
@@ -1365,6 +1368,7 @@ void stopLoading(void);
/* RDB persistence */
#include "rdb.h"
+int rdbSaveRio(rio *rdb, int *error, int flags);
/* AOF persistence */
void flushAppendOnlyFile(int force);
@@ -1377,6 +1381,7 @@ int startAppendOnly(void);
void backgroundRewriteDoneHandler(int exitcode, int bysignal);
void aofRewriteBufferReset(void);
unsigned long aofRewriteBufferSize(void);
+ssize_t aofReadDiffFromParent(void);
/* Sorted sets data type */
diff --git a/tests/unit/aofrw.tcl b/tests/unit/aofrw.tcl
index 4fdbdc6c6..c5430eedc 100644
--- a/tests/unit/aofrw.tcl
+++ b/tests/unit/aofrw.tcl
@@ -4,60 +4,63 @@ start_server {tags {"aofrw"}} {
r config set auto-aof-rewrite-percentage 0 ; # Disable auto-rewrite.
waitForBgrewriteaof r
- test {AOF rewrite during write load} {
- # Start a write load for 10 seconds
- set master [srv 0 client]
- set master_host [srv 0 host]
- set master_port [srv 0 port]
- set load_handle0 [start_write_load $master_host $master_port 10]
- set load_handle1 [start_write_load $master_host $master_port 10]
- set load_handle2 [start_write_load $master_host $master_port 10]
- set load_handle3 [start_write_load $master_host $master_port 10]
- set load_handle4 [start_write_load $master_host $master_port 10]
-
- # Make sure the instance is really receiving data
- wait_for_condition 50 100 {
- [r dbsize] > 0
- } else {
- fail "No write load detected."
- }
+ foreach rdbpre {yes no} {
+ r config set aof-use-rdb-preamble $rdbpre
+ test "AOF rewrite during write load: RDB preamble=$rdbpre" {
+ # Start a write load for 10 seconds
+ set master [srv 0 client]
+ set master_host [srv 0 host]
+ set master_port [srv 0 port]
+ set load_handle0 [start_write_load $master_host $master_port 10]
+ set load_handle1 [start_write_load $master_host $master_port 10]
+ set load_handle2 [start_write_load $master_host $master_port 10]
+ set load_handle3 [start_write_load $master_host $master_port 10]
+ set load_handle4 [start_write_load $master_host $master_port 10]
+
+ # Make sure the instance is really receiving data
+ wait_for_condition 50 100 {
+ [r dbsize] > 0
+ } else {
+ fail "No write load detected."
+ }
- # After 3 seconds, start a rewrite, while the write load is still
- # active.
- after 3000
- r bgrewriteaof
- waitForBgrewriteaof r
+ # After 3 seconds, start a rewrite, while the write load is still
+ # active.
+ after 3000
+ r bgrewriteaof
+ waitForBgrewriteaof r
+
+ # Let it run a bit more so that we'll append some data to the new
+ # AOF.
+ after 1000
+
+ # Stop the processes generating the load if they are still active
+ stop_write_load $load_handle0
+ stop_write_load $load_handle1
+ stop_write_load $load_handle2
+ stop_write_load $load_handle3
+ stop_write_load $load_handle4
+
+ # Make sure that we remain the only connected client.
+ # This step is needed to make sure there are no pending writes
+ # that will be processed between the two "debug digest" calls.
+ wait_for_condition 50 100 {
+ [llength [split [string trim [r client list]] "\n"]] == 1
+ } else {
+ puts [r client list]
+ fail "Clients generating loads are not disconnecting"
+ }
- # Let it run a bit more so that we'll append some data to the new
- # AOF.
- after 1000
+ # Get the data set digest
+ set d1 [r debug digest]
- # Stop the processes generating the load if they are still active
- stop_write_load $load_handle0
- stop_write_load $load_handle1
- stop_write_load $load_handle2
- stop_write_load $load_handle3
- stop_write_load $load_handle4
+ # Load the AOF
+ r debug loadaof
+ set d2 [r debug digest]
- # Make sure that we remain the only connected client.
- # This step is needed to make sure there are no pending writes
- # that will be processed between the two "debug digest" calls.
- wait_for_condition 50 100 {
- [llength [split [string trim [r client list]] "\n"]] == 1
- } else {
- puts [r client list]
- fail "Clients generating loads are not disconnecting"
+ # Make sure they are the same
+ assert {$d1 eq $d2}
}
-
- # Get the data set digest
- set d1 [r debug digest]
-
- # Load the AOF
- r debug loadaof
- set d2 [r debug digest]
-
- # Make sure they are the same
- assert {$d1 eq $d2}
}
}