summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorantirez <antirez@gmail.com>2016-08-09 11:07:32 +0200
committerantirez <antirez@gmail.com>2016-08-09 11:07:32 +0200
commit4426cb11e28dd35c08ac45282fb2a7a42a7638af (patch)
tree64b4690eec48e5274775040ee82ffca9efe3d4c5
parent9f779b33b54615d4bbaaafadfc5d5f9b93e2166b (diff)
downloadredis-4426cb11e28dd35c08ac45282fb2a7a42a7638af.tar.gz
RDB AOF preamble: WIP 1.
-rw-r--r--src/aof.c79
-rw-r--r--src/rdb.c23
-rw-r--r--src/rdb.h3
-rw-r--r--src/server.h2
4 files changed, 72 insertions, 35 deletions
diff --git a/src/aof.c b/src/aof.c
index 6a92a0cd9..39229b5df 100644
--- a/src/aof.c
+++ b/src/aof.c
@@ -989,7 +989,7 @@ int rewriteHashObject(rio *r, robj *key, robj *o) {
}
/* Call the module type callback in order to rewrite a data type
- * taht is exported by a module and is not handled by Redis itself.
+ * that is exported by a module and is not handled by Redis itself.
* The function returns 0 on error, 1 on success. */
int rewriteModuleObject(rio *r, robj *key, robj *o) {
RedisModuleIO io;
@@ -1015,37 +1015,11 @@ ssize_t aofReadDiffFromParent(void) {
return total;
}
-/* Write a sequence of commands able to fully rebuild the dataset into
- * "filename". Used both by REWRITEAOF and BGREWRITEAOF.
- *
- * In order to minimize the number of commands needed in the rewritten
- * log Redis uses variadic commands when possible, such as RPUSH, SADD
- * and ZADD. However at max AOF_REWRITE_ITEMS_PER_CMD items per time
- * are inserted using a single command. */
-int rewriteAppendOnlyFile(char *filename) {
+void rewriteAppendOnlyFileRio(rio *aof) {
dictIterator *di = NULL;
dictEntry *de;
- rio aof;
- FILE *fp;
- char tmpfile[256];
- int j;
- long long now = mstime();
- char byte;
size_t processed = 0;
- /* Note that we have to use a different temp name here compared to the
- * one used by rewriteAppendOnlyFileBackground() function. */
- snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid());
- fp = fopen(tmpfile,"w");
- if (!fp) {
- serverLog(LL_WARNING, "Opening the temp file for AOF rewrite in rewriteAppendOnlyFile(): %s", strerror(errno));
- return C_ERR;
- }
-
- server.aof_child_diff = sdsempty();
- rioInitWithFile(&aof,fp);
- if (server.aof_rewrite_incremental_fsync)
- rioSetAutoSync(&aof,AOF_AUTOSYNC_BYTES);
for (j = 0; j < server.dbnum; j++) {
char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n";
redisDb *db = server.db+j;
@@ -1105,7 +1079,7 @@ int rewriteAppendOnlyFile(char *filename) {
if (rioWriteBulkLongLong(&aof,expiretime) == 0) goto werr;
}
/* Read some diff from the parent process from time to time. */
- if (aof.processed_bytes > processed+1024*10) {
+ if (aof.processed_bytes > processed+AOF_READ_DIFF_INTERVAL_BYTES) {
processed = aof.processed_bytes;
aofReadDiffFromParent();
}
@@ -1113,6 +1087,52 @@ int rewriteAppendOnlyFile(char *filename) {
dictReleaseIterator(di);
di = NULL;
}
+ return C_OK;
+
+werr:
+ if (di) dictReleaseIterator(di);
+ return C_ERR;
+}
+
+/* Write a sequence of commands able to fully rebuild the dataset into
+ * "filename". Used both by REWRITEAOF and BGREWRITEAOF.
+ *
+ * In order to minimize the number of commands needed in the rewritten
+ * log Redis uses variadic commands when possible, such as RPUSH, SADD
+ * and ZADD. However at max AOF_REWRITE_ITEMS_PER_CMD items per time
+ * are inserted using a single command. */
+int rewriteAppendOnlyFile(char *filename) {
+ rio aof;
+ FILE *fp;
+ char tmpfile[256];
+ int j;
+ long long now = mstime();
+ char byte;
+
+ /* Note that we have to use a different temp name here compared to the
+ * one used by rewriteAppendOnlyFileBackground() function. */
+ snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid());
+ fp = fopen(tmpfile,"w");
+ if (!fp) {
+ serverLog(LL_WARNING, "Opening the temp file for AOF rewrite in rewriteAppendOnlyFile(): %s", strerror(errno));
+ return C_ERR;
+ }
+
+ server.aof_child_diff = sdsempty();
+ rioInitWithFile(&aof,fp);
+
+ if (server.aof_rewrite_incremental_fsync)
+ rioSetAutoSync(&aof,AOF_AUTOSYNC_BYTES);
+
+ if (server.aof_use_rdb_prefix) {
+ int error;
+ if (rdbSaveRio(&rdb,&error,RDB_SAVE_AOF_PREAMBLE) == C_ERR) {
+ errno = error;
+ goto werr;
+ }
+ } else {
+ rewriteAppendOnlyFileRio(&aof);
+ }
/* Do an initial slow fsync here while the parent is still sending
* data, in order to make the next final fsync faster. */
@@ -1178,7 +1198,6 @@ werr:
serverLog(LL_WARNING,"Write error writing append only file on disk: %s", strerror(errno));
fclose(fp);
unlink(tmpfile);
- if (di) dictReleaseIterator(di);
return C_ERR;
}
diff --git a/src/rdb.c b/src/rdb.c
index 859297943..83e5868cd 100644
--- a/src/rdb.c
+++ b/src/rdb.c
@@ -818,14 +818,16 @@ int rdbSaveAuxFieldStrInt(rio *rdb, char *key, long long val) {
}
/* Save a few default AUX fields with information about the RDB generated. */
-int rdbSaveInfoAuxFields(rio *rdb) {
+int rdbSaveInfoAuxFields(rio *rdb, int flags) {
int redis_bits = (sizeof(void*) == 8) ? 64 : 32;
+ int aof_preamble = (flags & RDB_SAVE_AOF_PREAMBLE) != 0;
/* Add a few fields about the state when the RDB was created. */
if (rdbSaveAuxFieldStrStr(rdb,"redis-ver",REDIS_VERSION) == -1) return -1;
if (rdbSaveAuxFieldStrInt(rdb,"redis-bits",redis_bits) == -1) return -1;
if (rdbSaveAuxFieldStrInt(rdb,"ctime",time(NULL)) == -1) return -1;
if (rdbSaveAuxFieldStrInt(rdb,"used-mem",zmalloc_used_memory()) == -1) return -1;
+ if (rdbSaveAuxFieldStrInt(rdb,"aof-preamble",aof_preamble)) return -1;
return 1;
}
@@ -837,19 +839,20 @@ int rdbSaveInfoAuxFields(rio *rdb) {
* When the function returns C_ERR and if 'error' is not NULL, the
* integer pointed by 'error' is set to the value of errno just after the I/O
* error. */
-int rdbSaveRio(rio *rdb, int *error) {
+int rdbSaveRio(rio *rdb, int *error, int flags) {
dictIterator *di = NULL;
dictEntry *de;
char magic[10];
int j;
long long now = mstime();
uint64_t cksum;
+ size_t processed = 0;
if (server.rdb_checksum)
rdb->update_cksum = rioGenericUpdateChecksum;
snprintf(magic,sizeof(magic),"REDIS%04d",RDB_VERSION);
if (rdbWriteRaw(rdb,magic,9) == -1) goto werr;
- if (rdbSaveInfoAuxFields(rdb) == -1) goto werr;
+ if (rdbSaveInfoAuxFields(rdb,flags) == -1) goto werr;
for (j = 0; j < server.dbnum; j++) {
redisDb *db = server.db+j;
@@ -886,6 +889,16 @@ int rdbSaveRio(rio *rdb, int *error) {
initStaticStringObject(key,keystr);
expire = getExpire(db,&key);
if (rdbSaveKeyValuePair(rdb,&key,o,expire,now) == -1) goto werr;
+
+ /* When this RDB is produced as part of an AOF rewrite, move
+ * accumulated diff from parent to child while rewriting in
+ * order to have a smaller final write. */
+ if (flags & RDB_SAVE_AOF_PREAMBLE &&
+ rdb.processed_bytes > processed+AOF_READ_DIFF_INTERVAL_BYTES)
+ {
+ processed = rdb.processed_bytes;
+ aofReadDiffFromParent();
+ }
}
dictReleaseIterator(di);
}
@@ -923,7 +936,7 @@ int rdbSaveRioWithEOFMark(rio *rdb, int *error) {
if (rioWrite(rdb,"$EOF:",5) == 0) goto werr;
if (rioWrite(rdb,eofmark,RDB_EOF_MARK_SIZE) == 0) goto werr;
if (rioWrite(rdb,"\r\n",2) == 0) goto werr;
- if (rdbSaveRio(rdb,error) == C_ERR) goto werr;
+ if (rdbSaveRio(rdb,error,RDB_SAVE_NONE) == C_ERR) goto werr;
if (rioWrite(rdb,eofmark,RDB_EOF_MARK_SIZE) == 0) goto werr;
return C_OK;
@@ -955,7 +968,7 @@ int rdbSave(char *filename) {
}
rioInitWithFile(&rdb,fp);
- if (rdbSaveRio(&rdb,&error) == C_ERR) {
+ if (rdbSaveRio(&rdb,&error,RDB_SAVE_NONE) == C_ERR) {
errno = error;
goto werr;
}
diff --git a/src/rdb.h b/src/rdb.h
index a71ecb16e..2c9a99850 100644
--- a/src/rdb.h
+++ b/src/rdb.h
@@ -106,6 +106,9 @@
#define RDB_LOAD_PLAIN (1<<1)
#define RDB_LOAD_SDS (1<<2)
+#define RDB_SAVE_NONE 0
+#define RDB_SAVE_AOF_PREAMBLE (1<<0)
+
int rdbSaveType(rio *rdb, unsigned char type);
int rdbLoadType(rio *rdb);
int rdbSaveTime(rio *rdb, time_t t);
diff --git a/src/server.h b/src/server.h
index d410d5b2a..2bc985cbb 100644
--- a/src/server.h
+++ b/src/server.h
@@ -93,6 +93,7 @@ typedef long long mstime_t; /* millisecond time type. */
#define AOF_REWRITE_PERC 100
#define AOF_REWRITE_MIN_SIZE (64*1024*1024)
#define AOF_REWRITE_ITEMS_PER_CMD 64
+#define AOF_READ_DIFF_INTERVAL_BYTES (1024*10)
#define CONFIG_DEFAULT_SLOWLOG_LOG_SLOWER_THAN 10000
#define CONFIG_DEFAULT_SLOWLOG_MAX_LEN 128
#define CONFIG_DEFAULT_MAX_CLIENTS 10000
@@ -1365,6 +1366,7 @@ void stopLoading(void);
/* RDB persistence */
#include "rdb.h"
+int rdbSaveRio(rio *rdb, int *error, int flags);
/* AOF persistence */
void flushAppendOnlyFile(int force);