diff options
-rw-r--r-- | redis.conf | 14 | ||||
-rw-r--r-- | src/aof.c | 149 | ||||
-rw-r--r-- | src/config.c | 9 | ||||
-rw-r--r-- | src/rdb.c | 90 | ||||
-rw-r--r-- | src/rdb.h | 4 | ||||
-rw-r--r-- | src/server.c | 1 | ||||
-rw-r--r-- | src/server.h | 5 | ||||
-rw-r--r-- | tests/unit/aofrw.tcl | 101 |
8 files changed, 236 insertions, 137 deletions
diff --git a/redis.conf b/redis.conf index b9217fdb4..a7b7f3a97 100644 --- a/redis.conf +++ b/redis.conf @@ -755,6 +755,20 @@ auto-aof-rewrite-min-size 64mb # will be found. aof-load-truncated yes +# When rewriting the AOF file, Redis is able to use an RDB preamble in the +# AOF file for faster rewrites and recoveries. When this option is turned +# on the rewritten AOF file is composed of two different stanzas: +# +# [RDB file][AOF tail] +# +# When loading Redis recognizes that the AOF file starts with the "REDIS" +# string and loads the prefixed RDB file, and continues loading the AOF +# tail. +# +# This is currently turned off by default in order to avoid the surprise +# of a format change, but will at some point be used as the default. +aof-use-rdb-preamble no + ################################ LUA SCRIPTING ############################### # Max execution time of a Lua script in milliseconds. @@ -616,19 +616,23 @@ int loadAppendOnlyFile(char *filename) { struct redis_stat sb; int old_aof_state = server.aof_state; long loops = 0; - off_t valid_up_to = 0; /* Offset of the latest well-formed command loaded. */ + off_t valid_up_to = 0; /* Offset of latest well-formed command loaded. */ + if (fp == NULL) { + serverLog(LL_WARNING,"Fatal error: can't open the append log file for reading: %s",strerror(errno)); + exit(1); + } + + /* Handle a zero-length AOF file as a special case. An emtpy AOF file + * is a valid AOF because an empty server with AOF enabled will create + * a zero length file at startup, that will remain like that if no write + * operation is received. */ if (fp && redis_fstat(fileno(fp),&sb) != -1 && sb.st_size == 0) { server.aof_current_size = 0; fclose(fp); return C_ERR; } - if (fp == NULL) { - serverLog(LL_WARNING,"Fatal error: can't open the append log file for reading: %s",strerror(errno)); - exit(1); - } - /* Temporarily disable AOF, to prevent EXEC from feeding a MULTI * to the same file we're about to read. */ server.aof_state = AOF_OFF; @@ -636,6 +640,28 @@ int loadAppendOnlyFile(char *filename) { fakeClient = createFakeClient(); startLoading(fp); + /* Check if this AOF file has an RDB preamble. In that case we need to + * load the RDB file and later continue loading the AOF tail. */ + char sig[5]; /* "REDIS" */ + if (fread(sig,1,5,fp) != 5 || memcmp(sig,"REDIS",5) != 0) { + /* No RDB preamble, seek back at 0 offset. */ + if (fseek(fp,0,SEEK_SET) == -1) goto readerr; + } else { + /* RDB preamble. Pass loading the RDB functions. */ + rio rdb; + + serverLog(LL_NOTICE,"Reading RDB preamble from AOF file..."); + if (fseek(fp,0,SEEK_SET) == -1) goto readerr; + rioInitWithFile(&rdb,fp); + if (rdbLoadRio(&rdb) != C_OK) { + serverLog(LL_WARNING,"Error reading the RDB preamble of the AOF file, AOF loading aborted"); + goto readerr; + } else { + serverLog(LL_NOTICE,"Reading the remaining AOF tail..."); + } + } + + /* Read the actual AOF file, in REPL format, command by command. */ while(1) { int argc, j; unsigned long len; @@ -989,7 +1015,7 @@ int rewriteHashObject(rio *r, robj *key, robj *o) { } /* Call the module type callback in order to rewrite a data type - * taht is exported by a module and is not handled by Redis itself. + * that is exported by a module and is not handled by Redis itself. * The function returns 0 on error, 1 on success. */ int rewriteModuleObject(rio *r, robj *key, robj *o) { RedisModuleIO io; @@ -1015,51 +1041,23 @@ ssize_t aofReadDiffFromParent(void) { return total; } -/* Write a sequence of commands able to fully rebuild the dataset into - * "filename". Used both by REWRITEAOF and BGREWRITEAOF. - * - * In order to minimize the number of commands needed in the rewritten - * log Redis uses variadic commands when possible, such as RPUSH, SADD - * and ZADD. However at max AOF_REWRITE_ITEMS_PER_CMD items per time - * are inserted using a single command. */ -int rewriteAppendOnlyFile(char *filename) { +int rewriteAppendOnlyFileRio(rio *aof) { dictIterator *di = NULL; dictEntry *de; - rio aof; - FILE *fp; - char tmpfile[256]; - int j; - long long now = mstime(); - char byte; size_t processed = 0; + long long now = mstime(); + int j; - /* Note that we have to use a different temp name here compared to the - * one used by rewriteAppendOnlyFileBackground() function. */ - snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid()); - fp = fopen(tmpfile,"w"); - if (!fp) { - serverLog(LL_WARNING, "Opening the temp file for AOF rewrite in rewriteAppendOnlyFile(): %s", strerror(errno)); - return C_ERR; - } - - server.aof_child_diff = sdsempty(); - rioInitWithFile(&aof,fp); - if (server.aof_rewrite_incremental_fsync) - rioSetAutoSync(&aof,AOF_AUTOSYNC_BYTES); for (j = 0; j < server.dbnum; j++) { char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n"; redisDb *db = server.db+j; dict *d = db->dict; if (dictSize(d) == 0) continue; di = dictGetSafeIterator(d); - if (!di) { - fclose(fp); - return C_ERR; - } /* SELECT the new DB */ - if (rioWrite(&aof,selectcmd,sizeof(selectcmd)-1) == 0) goto werr; - if (rioWriteBulkLongLong(&aof,j) == 0) goto werr; + if (rioWrite(aof,selectcmd,sizeof(selectcmd)-1) == 0) goto werr; + if (rioWriteBulkLongLong(aof,j) == 0) goto werr; /* Iterate this DB writing every entry */ while((de = dictNext(di)) != NULL) { @@ -1080,39 +1078,83 @@ int rewriteAppendOnlyFile(char *filename) { if (o->type == OBJ_STRING) { /* Emit a SET command */ char cmd[]="*3\r\n$3\r\nSET\r\n"; - if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr; + if (rioWrite(aof,cmd,sizeof(cmd)-1) == 0) goto werr; /* Key and value */ - if (rioWriteBulkObject(&aof,&key) == 0) goto werr; - if (rioWriteBulkObject(&aof,o) == 0) goto werr; + if (rioWriteBulkObject(aof,&key) == 0) goto werr; + if (rioWriteBulkObject(aof,o) == 0) goto werr; } else if (o->type == OBJ_LIST) { - if (rewriteListObject(&aof,&key,o) == 0) goto werr; + if (rewriteListObject(aof,&key,o) == 0) goto werr; } else if (o->type == OBJ_SET) { - if (rewriteSetObject(&aof,&key,o) == 0) goto werr; + if (rewriteSetObject(aof,&key,o) == 0) goto werr; } else if (o->type == OBJ_ZSET) { - if (rewriteSortedSetObject(&aof,&key,o) == 0) goto werr; + if (rewriteSortedSetObject(aof,&key,o) == 0) goto werr; } else if (o->type == OBJ_HASH) { - if (rewriteHashObject(&aof,&key,o) == 0) goto werr; + if (rewriteHashObject(aof,&key,o) == 0) goto werr; } else if (o->type == OBJ_MODULE) { - if (rewriteModuleObject(&aof,&key,o) == 0) goto werr; + if (rewriteModuleObject(aof,&key,o) == 0) goto werr; } else { serverPanic("Unknown object type"); } /* Save the expire time */ if (expiretime != -1) { char cmd[]="*3\r\n$9\r\nPEXPIREAT\r\n"; - if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr; - if (rioWriteBulkObject(&aof,&key) == 0) goto werr; - if (rioWriteBulkLongLong(&aof,expiretime) == 0) goto werr; + if (rioWrite(aof,cmd,sizeof(cmd)-1) == 0) goto werr; + if (rioWriteBulkObject(aof,&key) == 0) goto werr; + if (rioWriteBulkLongLong(aof,expiretime) == 0) goto werr; } /* Read some diff from the parent process from time to time. */ - if (aof.processed_bytes > processed+1024*10) { - processed = aof.processed_bytes; + if (aof->processed_bytes > processed+AOF_READ_DIFF_INTERVAL_BYTES) { + processed = aof->processed_bytes; aofReadDiffFromParent(); } } dictReleaseIterator(di); di = NULL; } + return C_OK; + +werr: + if (di) dictReleaseIterator(di); + return C_ERR; +} + +/* Write a sequence of commands able to fully rebuild the dataset into + * "filename". Used both by REWRITEAOF and BGREWRITEAOF. + * + * In order to minimize the number of commands needed in the rewritten + * log Redis uses variadic commands when possible, such as RPUSH, SADD + * and ZADD. However at max AOF_REWRITE_ITEMS_PER_CMD items per time + * are inserted using a single command. */ +int rewriteAppendOnlyFile(char *filename) { + rio aof; + FILE *fp; + char tmpfile[256]; + char byte; + + /* Note that we have to use a different temp name here compared to the + * one used by rewriteAppendOnlyFileBackground() function. */ + snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid()); + fp = fopen(tmpfile,"w"); + if (!fp) { + serverLog(LL_WARNING, "Opening the temp file for AOF rewrite in rewriteAppendOnlyFile(): %s", strerror(errno)); + return C_ERR; + } + + server.aof_child_diff = sdsempty(); + rioInitWithFile(&aof,fp); + + if (server.aof_rewrite_incremental_fsync) + rioSetAutoSync(&aof,AOF_AUTOSYNC_BYTES); + + if (server.aof_use_rdb_preamble) { + int error; + if (rdbSaveRio(&aof,&error,RDB_SAVE_AOF_PREAMBLE) == C_ERR) { + errno = error; + goto werr; + } + } else { + if (rewriteAppendOnlyFileRio(&aof) == C_ERR) goto werr; + } /* Do an initial slow fsync here while the parent is still sending * data, in order to make the next final fsync faster. */ @@ -1178,7 +1220,6 @@ werr: serverLog(LL_WARNING,"Write error writing append only file on disk: %s", strerror(errno)); fclose(fp); unlink(tmpfile); - if (di) dictReleaseIterator(di); return C_ERR; } diff --git a/src/config.c b/src/config.c index dd21a0aca..1d81180b7 100644 --- a/src/config.c +++ b/src/config.c @@ -475,6 +475,10 @@ void loadServerConfigFromString(char *config) { if ((server.aof_load_truncated = yesnotoi(argv[1])) == -1) { err = "argument must be 'yes' or 'no'"; goto loaderr; } + } else if (!strcasecmp(argv[0],"aof-use-rdb-preamble") && argc == 2) { + if ((server.aof_use_rdb_preamble = yesnotoi(argv[1])) == -1) { + err = "argument must be 'yes' or 'no'"; goto loaderr; + } } else if (!strcasecmp(argv[0],"requirepass") && argc == 2) { if (strlen(argv[1]) > CONFIG_AUTHPASS_MAX_LEN) { err = "Password is longer than CONFIG_AUTHPASS_MAX_LEN"; @@ -954,6 +958,8 @@ void configSetCommand(client *c) { } config_set_bool_field( "aof-load-truncated",server.aof_load_truncated) { } config_set_bool_field( + "aof-use-rdb-preamble",server.aof_use_rdb_preamble) { + } config_set_bool_field( "slave-serve-stale-data",server.repl_serve_stale_data) { } config_set_bool_field( "slave-read-only",server.repl_slave_ro) { @@ -1227,6 +1233,8 @@ void configGetCommand(client *c) { server.aof_rewrite_incremental_fsync); config_get_bool_field("aof-load-truncated", server.aof_load_truncated); + config_get_bool_field("aof-use-rdb-preamble", + server.aof_use_rdb_preamble); config_get_bool_field("lazyfree-lazy-eviction", server.lazyfree_lazy_eviction); config_get_bool_field("lazyfree-lazy-expire", @@ -1947,6 +1955,7 @@ int rewriteConfig(char *path) { rewriteConfigNumericalOption(state,"hz",server.hz,CONFIG_DEFAULT_HZ); rewriteConfigYesNoOption(state,"aof-rewrite-incremental-fsync",server.aof_rewrite_incremental_fsync,CONFIG_DEFAULT_AOF_REWRITE_INCREMENTAL_FSYNC); rewriteConfigYesNoOption(state,"aof-load-truncated",server.aof_load_truncated,CONFIG_DEFAULT_AOF_LOAD_TRUNCATED); + rewriteConfigYesNoOption(state,"aof-use-rdb-preamble",server.aof_use_rdb_preamble,CONFIG_DEFAULT_AOF_USE_RDB_PREAMBLE); rewriteConfigEnumOption(state,"supervised",server.supervised_mode,supervised_mode_enum,SUPERVISED_NONE); rewriteConfigYesNoOption(state,"lazyfree-lazy-eviction",server.lazyfree_lazy_eviction,CONFIG_DEFAULT_LAZYFREE_LAZY_EVICTION); rewriteConfigYesNoOption(state,"lazyfree-lazy-expire",server.lazyfree_lazy_expire,CONFIG_DEFAULT_LAZYFREE_LAZY_EXPIRE); @@ -818,14 +818,16 @@ int rdbSaveAuxFieldStrInt(rio *rdb, char *key, long long val) { } /* Save a few default AUX fields with information about the RDB generated. */ -int rdbSaveInfoAuxFields(rio *rdb) { +int rdbSaveInfoAuxFields(rio *rdb, int flags) { int redis_bits = (sizeof(void*) == 8) ? 64 : 32; + int aof_preamble = (flags & RDB_SAVE_AOF_PREAMBLE) != 0; /* Add a few fields about the state when the RDB was created. */ if (rdbSaveAuxFieldStrStr(rdb,"redis-ver",REDIS_VERSION) == -1) return -1; if (rdbSaveAuxFieldStrInt(rdb,"redis-bits",redis_bits) == -1) return -1; if (rdbSaveAuxFieldStrInt(rdb,"ctime",time(NULL)) == -1) return -1; if (rdbSaveAuxFieldStrInt(rdb,"used-mem",zmalloc_used_memory()) == -1) return -1; + if (rdbSaveAuxFieldStrInt(rdb,"aof-preamble",aof_preamble) == -1) return -1; return 1; } @@ -837,19 +839,20 @@ int rdbSaveInfoAuxFields(rio *rdb) { * When the function returns C_ERR and if 'error' is not NULL, the * integer pointed by 'error' is set to the value of errno just after the I/O * error. */ -int rdbSaveRio(rio *rdb, int *error) { +int rdbSaveRio(rio *rdb, int *error, int flags) { dictIterator *di = NULL; dictEntry *de; char magic[10]; int j; long long now = mstime(); uint64_t cksum; + size_t processed = 0; if (server.rdb_checksum) rdb->update_cksum = rioGenericUpdateChecksum; snprintf(magic,sizeof(magic),"REDIS%04d",RDB_VERSION); if (rdbWriteRaw(rdb,magic,9) == -1) goto werr; - if (rdbSaveInfoAuxFields(rdb) == -1) goto werr; + if (rdbSaveInfoAuxFields(rdb,flags) == -1) goto werr; for (j = 0; j < server.dbnum; j++) { redisDb *db = server.db+j; @@ -886,6 +889,16 @@ int rdbSaveRio(rio *rdb, int *error) { initStaticStringObject(key,keystr); expire = getExpire(db,&key); if (rdbSaveKeyValuePair(rdb,&key,o,expire,now) == -1) goto werr; + + /* When this RDB is produced as part of an AOF rewrite, move + * accumulated diff from parent to child while rewriting in + * order to have a smaller final write. */ + if (flags & RDB_SAVE_AOF_PREAMBLE && + rdb->processed_bytes > processed+AOF_READ_DIFF_INTERVAL_BYTES) + { + processed = rdb->processed_bytes; + aofReadDiffFromParent(); + } } dictReleaseIterator(di); } @@ -923,7 +936,7 @@ int rdbSaveRioWithEOFMark(rio *rdb, int *error) { if (rioWrite(rdb,"$EOF:",5) == 0) goto werr; if (rioWrite(rdb,eofmark,RDB_EOF_MARK_SIZE) == 0) goto werr; if (rioWrite(rdb,"\r\n",2) == 0) goto werr; - if (rdbSaveRio(rdb,error) == C_ERR) goto werr; + if (rdbSaveRio(rdb,error,RDB_SAVE_NONE) == C_ERR) goto werr; if (rioWrite(rdb,eofmark,RDB_EOF_MARK_SIZE) == 0) goto werr; return C_OK; @@ -955,7 +968,7 @@ int rdbSave(char *filename) { } rioInitWithFile(&rdb,fp); - if (rdbSaveRio(&rdb,&error) == C_ERR) { + if (rdbSaveRio(&rdb,&error,RDB_SAVE_NONE) == C_ERR) { errno = error; goto werr; } @@ -1373,67 +1386,61 @@ void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) { } } -int rdbLoad(char *filename) { +/* Load an RDB file from the rio stream 'rdb'. On success C_OK is returned, + * otherwise C_ERR is returned and 'errno' is set accordingly. */ +int rdbLoadRio(rio *rdb) { uint64_t dbid; int type, rdbver; redisDb *db = server.db+0; char buf[1024]; long long expiretime, now = mstime(); - FILE *fp; - rio rdb; - if ((fp = fopen(filename,"r")) == NULL) return C_ERR; - - rioInitWithFile(&rdb,fp); - rdb.update_cksum = rdbLoadProgressCallback; - rdb.max_processing_chunk = server.loading_process_events_interval_bytes; - if (rioRead(&rdb,buf,9) == 0) goto eoferr; + rdb->update_cksum = rdbLoadProgressCallback; + rdb->max_processing_chunk = server.loading_process_events_interval_bytes; + if (rioRead(rdb,buf,9) == 0) goto eoferr; buf[9] = '\0'; if (memcmp(buf,"REDIS",5) != 0) { - fclose(fp); serverLog(LL_WARNING,"Wrong signature trying to load DB from file"); errno = EINVAL; return C_ERR; } rdbver = atoi(buf+5); if (rdbver < 1 || rdbver > RDB_VERSION) { - fclose(fp); serverLog(LL_WARNING,"Can't handle RDB format version %d",rdbver); errno = EINVAL; return C_ERR; } - startLoading(fp); while(1) { robj *key, *val; expiretime = -1; /* Read type. */ - if ((type = rdbLoadType(&rdb)) == -1) goto eoferr; + if ((type = rdbLoadType(rdb)) == -1) goto eoferr; /* Handle special types. */ if (type == RDB_OPCODE_EXPIRETIME) { /* EXPIRETIME: load an expire associated with the next key * to load. Note that after loading an expire we need to * load the actual type, and continue. */ - if ((expiretime = rdbLoadTime(&rdb)) == -1) goto eoferr; + if ((expiretime = rdbLoadTime(rdb)) == -1) goto eoferr; /* We read the time so we need to read the object type again. */ - if ((type = rdbLoadType(&rdb)) == -1) goto eoferr; + if ((type = rdbLoadType(rdb)) == -1) goto eoferr; /* the EXPIRETIME opcode specifies time in seconds, so convert * into milliseconds. */ expiretime *= 1000; } else if (type == RDB_OPCODE_EXPIRETIME_MS) { /* EXPIRETIME_MS: milliseconds precision expire times introduced * with RDB v3. Like EXPIRETIME but no with more precision. */ - if ((expiretime = rdbLoadMillisecondTime(&rdb)) == -1) goto eoferr; + if ((expiretime = rdbLoadMillisecondTime(rdb)) == -1) goto eoferr; /* We read the time so we need to read the object type again. */ - if ((type = rdbLoadType(&rdb)) == -1) goto eoferr; + if ((type = rdbLoadType(rdb)) == -1) goto eoferr; } else if (type == RDB_OPCODE_EOF) { /* EOF: End of file, exit the main loop. */ break; } else if (type == RDB_OPCODE_SELECTDB) { /* SELECTDB: Select the specified database. */ - if ((dbid = rdbLoadLen(&rdb,NULL)) == RDB_LENERR) + if ((dbid = rdbLoadLen(rdb,NULL)) == RDB_LENERR) goto eoferr; if (dbid >= (unsigned)server.dbnum) { serverLog(LL_WARNING, @@ -1448,9 +1455,9 @@ int rdbLoad(char *filename) { /* RESIZEDB: Hint about the size of the keys in the currently * selected data base, in order to avoid useless rehashing. */ uint64_t db_size, expires_size; - if ((db_size = rdbLoadLen(&rdb,NULL)) == RDB_LENERR) + if ((db_size = rdbLoadLen(rdb,NULL)) == RDB_LENERR) goto eoferr; - if ((expires_size = rdbLoadLen(&rdb,NULL)) == RDB_LENERR) + if ((expires_size = rdbLoadLen(rdb,NULL)) == RDB_LENERR) goto eoferr; dictExpand(db->dict,db_size); dictExpand(db->expires,expires_size); @@ -1462,8 +1469,8 @@ int rdbLoad(char *filename) { * * An AUX field is composed of two strings: key and value. */ robj *auxkey, *auxval; - if ((auxkey = rdbLoadStringObject(&rdb)) == NULL) goto eoferr; - if ((auxval = rdbLoadStringObject(&rdb)) == NULL) goto eoferr; + if ((auxkey = rdbLoadStringObject(rdb)) == NULL) goto eoferr; + if ((auxval = rdbLoadStringObject(rdb)) == NULL) goto eoferr; if (((char*)auxkey->ptr)[0] == '%') { /* All the fields with a name staring with '%' are considered @@ -1485,9 +1492,9 @@ int rdbLoad(char *filename) { } /* Read key */ - if ((key = rdbLoadStringObject(&rdb)) == NULL) goto eoferr; + if ((key = rdbLoadStringObject(rdb)) == NULL) goto eoferr; /* Read value */ - if ((val = rdbLoadObject(type,&rdb)) == NULL) goto eoferr; + if ((val = rdbLoadObject(type,rdb)) == NULL) goto eoferr; /* Check if the key already expired. This function is used when loading * an RDB file from disk, either at startup, or when an RDB was * received from the master. In the latter case, the master is @@ -1508,9 +1515,9 @@ int rdbLoad(char *filename) { } /* Verify the checksum if RDB version is >= 5 */ if (rdbver >= 5 && server.rdb_checksum) { - uint64_t cksum, expected = rdb.cksum; + uint64_t cksum, expected = rdb->cksum; - if (rioRead(&rdb,&cksum,8) == 0) goto eoferr; + if (rioRead(rdb,&cksum,8) == 0) goto eoferr; memrev64ifbe(&cksum); if (cksum == 0) { serverLog(LL_WARNING,"RDB file was saved with checksum disabled: no check performed."); @@ -1519,9 +1526,6 @@ int rdbLoad(char *filename) { rdbExitReportCorruptRDB("RDB CRC error"); } } - - fclose(fp); - stopLoading(); return C_OK; eoferr: /* unexpected end of file is handled here with a fatal exit */ @@ -1530,6 +1534,24 @@ eoferr: /* unexpected end of file is handled here with a fatal exit */ return C_ERR; /* Just to avoid warning */ } +/* Like rdbLoadRio() but takes a filename instead of a rio stream. The + * filename is open for reading and a rio stream object created in order + * to do the actual loading. Moreover the ETA displayed in the INFO + * output is initialized and finalized. */ +int rdbLoad(char *filename) { + FILE *fp; + rio rdb; + int retval; + + if ((fp = fopen(filename,"r")) == NULL) return C_ERR; + startLoading(fp); + rioInitWithFile(&rdb,fp); + retval = rdbLoadRio(&rdb); + fclose(fp); + stopLoading(); + return retval; +} + /* A background saving child (BGSAVE) terminated its work. Handle this. * This function covers the case of actual BGSAVEs. */ void backgroundSaveDoneHandlerDisk(int exitcode, int bysignal) { @@ -106,6 +106,9 @@ #define RDB_LOAD_PLAIN (1<<1) #define RDB_LOAD_SDS (1<<2) +#define RDB_SAVE_NONE 0 +#define RDB_SAVE_AOF_PREAMBLE (1<<0) + int rdbSaveType(rio *rdb, unsigned char type); int rdbLoadType(rio *rdb); int rdbSaveTime(rio *rdb, time_t t); @@ -131,5 +134,6 @@ ssize_t rdbSaveRawString(rio *rdb, unsigned char *s, size_t len); void *rdbGenericLoadStringObject(rio *rdb, int flags, size_t *lenptr); int rdbSaveBinaryDoubleValue(rio *rdb, double val); int rdbLoadBinaryDoubleValue(rio *rdb, double *val); +int rdbLoadRio(rio *rdb); #endif diff --git a/src/server.c b/src/server.c index c737a470f..e794ad132 100644 --- a/src/server.c +++ b/src/server.c @@ -1345,6 +1345,7 @@ void initServerConfig(void) { server.aof_flush_postponed_start = 0; server.aof_rewrite_incremental_fsync = CONFIG_DEFAULT_AOF_REWRITE_INCREMENTAL_FSYNC; server.aof_load_truncated = CONFIG_DEFAULT_AOF_LOAD_TRUNCATED; + server.aof_use_rdb_preamble = CONFIG_DEFAULT_AOF_USE_RDB_PREAMBLE; server.pidfile = NULL; server.rdb_filename = zstrdup(CONFIG_DEFAULT_RDB_FILENAME); server.aof_filename = zstrdup(CONFIG_DEFAULT_AOF_FILENAME); diff --git a/src/server.h b/src/server.h index d410d5b2a..a5f0ee1a6 100644 --- a/src/server.h +++ b/src/server.h @@ -93,6 +93,7 @@ typedef long long mstime_t; /* millisecond time type. */ #define AOF_REWRITE_PERC 100 #define AOF_REWRITE_MIN_SIZE (64*1024*1024) #define AOF_REWRITE_ITEMS_PER_CMD 64 +#define AOF_READ_DIFF_INTERVAL_BYTES (1024*10) #define CONFIG_DEFAULT_SLOWLOG_LOG_SLOWER_THAN 10000 #define CONFIG_DEFAULT_SLOWLOG_MAX_LEN 128 #define CONFIG_DEFAULT_MAX_CLIENTS 10000 @@ -136,6 +137,7 @@ typedef long long mstime_t; /* millisecond time type. */ #define CONFIG_DEFAULT_AOF_FILENAME "appendonly.aof" #define CONFIG_DEFAULT_AOF_NO_FSYNC_ON_REWRITE 0 #define CONFIG_DEFAULT_AOF_LOAD_TRUNCATED 1 +#define CONFIG_DEFAULT_AOF_USE_RDB_PREAMBLE 0 #define CONFIG_DEFAULT_ACTIVE_REHASHING 1 #define CONFIG_DEFAULT_AOF_REWRITE_INCREMENTAL_FSYNC 1 #define CONFIG_DEFAULT_MIN_SLAVES_TO_WRITE 0 @@ -900,6 +902,7 @@ struct redisServer { int aof_last_write_status; /* C_OK or C_ERR */ int aof_last_write_errno; /* Valid if aof_last_write_status is ERR */ int aof_load_truncated; /* Don't stop on unexpected AOF EOF. */ + int aof_use_rdb_preamble; /* Use RDB preamble on AOF rewrites. */ /* AOF pipes used to communicate between parent and child during rewrite. */ int aof_pipe_write_data_to_child; int aof_pipe_read_data_from_parent; @@ -1365,6 +1368,7 @@ void stopLoading(void); /* RDB persistence */ #include "rdb.h" +int rdbSaveRio(rio *rdb, int *error, int flags); /* AOF persistence */ void flushAppendOnlyFile(int force); @@ -1377,6 +1381,7 @@ int startAppendOnly(void); void backgroundRewriteDoneHandler(int exitcode, int bysignal); void aofRewriteBufferReset(void); unsigned long aofRewriteBufferSize(void); +ssize_t aofReadDiffFromParent(void); /* Sorted sets data type */ diff --git a/tests/unit/aofrw.tcl b/tests/unit/aofrw.tcl index 4fdbdc6c6..c5430eedc 100644 --- a/tests/unit/aofrw.tcl +++ b/tests/unit/aofrw.tcl @@ -4,60 +4,63 @@ start_server {tags {"aofrw"}} { r config set auto-aof-rewrite-percentage 0 ; # Disable auto-rewrite. waitForBgrewriteaof r - test {AOF rewrite during write load} { - # Start a write load for 10 seconds - set master [srv 0 client] - set master_host [srv 0 host] - set master_port [srv 0 port] - set load_handle0 [start_write_load $master_host $master_port 10] - set load_handle1 [start_write_load $master_host $master_port 10] - set load_handle2 [start_write_load $master_host $master_port 10] - set load_handle3 [start_write_load $master_host $master_port 10] - set load_handle4 [start_write_load $master_host $master_port 10] - - # Make sure the instance is really receiving data - wait_for_condition 50 100 { - [r dbsize] > 0 - } else { - fail "No write load detected." - } + foreach rdbpre {yes no} { + r config set aof-use-rdb-preamble $rdbpre + test "AOF rewrite during write load: RDB preamble=$rdbpre" { + # Start a write load for 10 seconds + set master [srv 0 client] + set master_host [srv 0 host] + set master_port [srv 0 port] + set load_handle0 [start_write_load $master_host $master_port 10] + set load_handle1 [start_write_load $master_host $master_port 10] + set load_handle2 [start_write_load $master_host $master_port 10] + set load_handle3 [start_write_load $master_host $master_port 10] + set load_handle4 [start_write_load $master_host $master_port 10] + + # Make sure the instance is really receiving data + wait_for_condition 50 100 { + [r dbsize] > 0 + } else { + fail "No write load detected." + } - # After 3 seconds, start a rewrite, while the write load is still - # active. - after 3000 - r bgrewriteaof - waitForBgrewriteaof r + # After 3 seconds, start a rewrite, while the write load is still + # active. + after 3000 + r bgrewriteaof + waitForBgrewriteaof r + + # Let it run a bit more so that we'll append some data to the new + # AOF. + after 1000 + + # Stop the processes generating the load if they are still active + stop_write_load $load_handle0 + stop_write_load $load_handle1 + stop_write_load $load_handle2 + stop_write_load $load_handle3 + stop_write_load $load_handle4 + + # Make sure that we remain the only connected client. + # This step is needed to make sure there are no pending writes + # that will be processed between the two "debug digest" calls. + wait_for_condition 50 100 { + [llength [split [string trim [r client list]] "\n"]] == 1 + } else { + puts [r client list] + fail "Clients generating loads are not disconnecting" + } - # Let it run a bit more so that we'll append some data to the new - # AOF. - after 1000 + # Get the data set digest + set d1 [r debug digest] - # Stop the processes generating the load if they are still active - stop_write_load $load_handle0 - stop_write_load $load_handle1 - stop_write_load $load_handle2 - stop_write_load $load_handle3 - stop_write_load $load_handle4 + # Load the AOF + r debug loadaof + set d2 [r debug digest] - # Make sure that we remain the only connected client. - # This step is needed to make sure there are no pending writes - # that will be processed between the two "debug digest" calls. - wait_for_condition 50 100 { - [llength [split [string trim [r client list]] "\n"]] == 1 - } else { - puts [r client list] - fail "Clients generating loads are not disconnecting" + # Make sure they are the same + assert {$d1 eq $d2} } - - # Get the data set digest - set d1 [r debug digest] - - # Load the AOF - r debug loadaof - set d2 [r debug digest] - - # Make sure they are the same - assert {$d1 eq $d2} } } |