summaryrefslogtreecommitdiff
path: root/src/rdb.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/rdb.c')
-rw-r--r--src/rdb.c80
1 files changed, 63 insertions, 17 deletions
diff --git a/src/rdb.c b/src/rdb.c
index f530219a4..b569edfea 100644
--- a/src/rdb.c
+++ b/src/rdb.c
@@ -1080,9 +1080,9 @@ ssize_t rdbSaveAuxFieldStrInt(rio *rdb, char *key, long long val) {
}
/* Save a few default AUX fields with information about the RDB generated. */
-int rdbSaveInfoAuxFields(rio *rdb, int flags, rdbSaveInfo *rsi) {
+int rdbSaveInfoAuxFields(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
int redis_bits = (sizeof(void*) == 8) ? 64 : 32;
- int aof_preamble = (flags & RDB_SAVE_AOF_PREAMBLE) != 0;
+ int aof_preamble = (rdbflags & RDBFLAGS_AOF_PREAMBLE) != 0;
/* Add a few fields about the state when the RDB was created. */
if (rdbSaveAuxFieldStrStr(rdb,"redis-ver",REDIS_VERSION) == -1) return -1;
@@ -1150,7 +1150,7 @@ ssize_t rdbSaveSingleModuleAux(rio *rdb, int when, moduleType *mt) {
* When the function returns C_ERR and if 'error' is not NULL, the
* integer pointed by 'error' is set to the value of errno just after the I/O
* error. */
-int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi) {
+int rdbSaveRio(rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi) {
dictIterator *di = NULL;
dictEntry *de;
char magic[10];
@@ -1162,7 +1162,7 @@ int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi) {
rdb->update_cksum = rioGenericUpdateChecksum;
snprintf(magic,sizeof(magic),"REDIS%04d",RDB_VERSION);
if (rdbWriteRaw(rdb,magic,9) == -1) goto werr;
- if (rdbSaveInfoAuxFields(rdb,flags,rsi) == -1) goto werr;
+ if (rdbSaveInfoAuxFields(rdb,rdbflags,rsi) == -1) goto werr;
if (rdbSaveModulesAux(rdb, REDISMODULE_AUX_BEFORE_RDB) == -1) goto werr;
for (j = 0; j < server.dbnum; j++) {
@@ -1199,7 +1199,7 @@ int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi) {
/* When this RDB is produced as part of an AOF rewrite, move
* accumulated diff from parent to child while rewriting in
* order to have a smaller final write. */
- if (flags & RDB_SAVE_AOF_PREAMBLE &&
+ if (rdbflags & RDBFLAGS_AOF_PREAMBLE &&
rdb->processed_bytes > processed+AOF_READ_DIFF_INTERVAL_BYTES)
{
processed = rdb->processed_bytes;
@@ -1254,18 +1254,21 @@ werr:
int rdbSaveRioWithEOFMark(rio *rdb, int *error, rdbSaveInfo *rsi) {
char eofmark[RDB_EOF_MARK_SIZE];
+ startSaving(RDBFLAGS_REPLICATION);
getRandomHexChars(eofmark,RDB_EOF_MARK_SIZE);
if (error) *error = 0;
if (rioWrite(rdb,"$EOF:",5) == 0) goto werr;
if (rioWrite(rdb,eofmark,RDB_EOF_MARK_SIZE) == 0) goto werr;
if (rioWrite(rdb,"\r\n",2) == 0) goto werr;
- if (rdbSaveRio(rdb,error,RDB_SAVE_NONE,rsi) == C_ERR) goto werr;
+ if (rdbSaveRio(rdb,error,RDBFLAGS_NONE,rsi) == C_ERR) goto werr;
if (rioWrite(rdb,eofmark,RDB_EOF_MARK_SIZE) == 0) goto werr;
+ stopSaving(1);
return C_OK;
werr: /* Write error. */
/* Set 'error' only if not already set by rdbSaveRio() call. */
if (error && *error == 0) *error = errno;
+ stopSaving(0);
return C_ERR;
}
@@ -1291,11 +1294,12 @@ int rdbSave(char *filename, rdbSaveInfo *rsi) {
}
rioInitWithFile(&rdb,fp);
+ startSaving(RDBFLAGS_NONE);
if (server.rdb_save_incremental_fsync)
rioSetAutoSync(&rdb,REDIS_AUTOSYNC_BYTES);
- if (rdbSaveRio(&rdb,&error,RDB_SAVE_NONE,rsi) == C_ERR) {
+ if (rdbSaveRio(&rdb,&error,RDBFLAGS_NONE,rsi) == C_ERR) {
errno = error;
goto werr;
}
@@ -1317,6 +1321,7 @@ int rdbSave(char *filename, rdbSaveInfo *rsi) {
cwdp ? cwdp : "unknown",
strerror(errno));
unlink(tmpfile);
+ stopSaving(0);
return C_ERR;
}
@@ -1324,12 +1329,14 @@ int rdbSave(char *filename, rdbSaveInfo *rsi) {
server.dirty = 0;
server.lastsave = time(NULL);
server.lastbgsave_status = C_OK;
+ stopSaving(1);
return C_OK;
werr:
serverLog(LL_WARNING,"Write error saving DB on disk: %s", strerror(errno));
fclose(fp);
unlink(tmpfile);
+ stopSaving(0);
return C_ERR;
}
@@ -1918,23 +1925,33 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) {
/* Mark that we are loading in the global state and setup the fields
* needed to provide loading stats. */
-void startLoading(size_t size) {
+void startLoading(size_t size, int rdbflags) {
/* Load the DB */
server.loading = 1;
server.loading_start_time = time(NULL);
server.loading_loaded_bytes = 0;
server.loading_total_bytes = size;
+
+ /* Fire the loading modules start event. */
+ int subevent;
+ if (rdbflags & RDBFLAGS_AOF_PREAMBLE)
+ subevent = REDISMODULE_SUBEVENT_LOADING_AOF_START;
+ else if(rdbflags & RDBFLAGS_REPLICATION)
+ subevent = REDISMODULE_SUBEVENT_LOADING_REPL_START;
+ else
+ subevent = REDISMODULE_SUBEVENT_LOADING_RDB_START;
+ moduleFireServerEvent(REDISMODULE_EVENT_LOADING,subevent,NULL);
}
/* Mark that we are loading in the global state and setup the fields
* needed to provide loading stats.
* 'filename' is optional and used for rdb-check on error */
-void startLoadingFile(FILE *fp, char* filename) {
+void startLoadingFile(FILE *fp, char* filename, int rdbflags) {
struct stat sb;
if (fstat(fileno(fp), &sb) == -1)
sb.st_size = 0;
rdbFileBeingLoaded = filename;
- startLoading(sb.st_size);
+ startLoading(sb.st_size, rdbflags);
}
/* Refresh the loading progress info */
@@ -1945,9 +1962,37 @@ void loadingProgress(off_t pos) {
}
/* Loading finished */
-void stopLoading(void) {
+void stopLoading(int success) {
server.loading = 0;
rdbFileBeingLoaded = NULL;
+
+ /* Fire the loading modules end event. */
+ moduleFireServerEvent(REDISMODULE_EVENT_LOADING,
+ success?
+ REDISMODULE_SUBEVENT_LOADING_ENDED:
+ REDISMODULE_SUBEVENT_LOADING_FAILED,
+ NULL);
+}
+
+void startSaving(int rdbflags) {
+ /* Fire the persistence modules end event. */
+ int subevent;
+ if (rdbflags & RDBFLAGS_AOF_PREAMBLE)
+ subevent = REDISMODULE_SUBEVENT_PERSISTENCE_AOF_START;
+ else if (getpid()!=server.pid)
+ subevent = REDISMODULE_SUBEVENT_PERSISTENCE_RDB_START;
+ else
+ subevent = REDISMODULE_SUBEVENT_PERSISTENCE_SYNC_RDB_START;
+ moduleFireServerEvent(REDISMODULE_EVENT_PERSISTENCE,subevent,NULL);
+}
+
+void stopSaving(int success) {
+ /* Fire the persistence modules end event. */
+ moduleFireServerEvent(REDISMODULE_EVENT_PERSISTENCE,
+ success?
+ REDISMODULE_SUBEVENT_PERSISTENCE_ENDED:
+ REDISMODULE_SUBEVENT_PERSISTENCE_FAILED,
+ NULL);
}
/* Track loading progress in order to serve client's from time to time
@@ -1966,12 +2011,13 @@ void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) {
replicationSendNewlineToMaster();
loadingProgress(r->processed_bytes);
processEventsWhileBlocked();
+ processModuleLoadingProgressEvent(0);
}
}
/* Load an RDB file from the rio stream 'rdb'. On success C_OK is returned,
* otherwise C_ERR is returned and 'errno' is set accordingly. */
-int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) {
+int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
uint64_t dbid;
int type, rdbver;
redisDb *db = server.db+0;
@@ -2182,7 +2228,7 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) {
* received from the master. In the latter case, the master is
* responsible for key expiry. If we would expire keys here, the
* snapshot taken by the master may not be reflected on the slave. */
- if (server.masterhost == NULL && !loading_aof && expiretime != -1 && expiretime < now) {
+ if (server.masterhost == NULL && !(rdbflags&RDBFLAGS_AOF_PREAMBLE) && expiretime != -1 && expiretime < now) {
decrRefCount(key);
decrRefCount(val);
} else {
@@ -2243,17 +2289,17 @@ eoferr:
*
* If you pass an 'rsi' structure initialied with RDB_SAVE_OPTION_INIT, the
* loading code will fiil the information fields in the structure. */
-int rdbLoad(char *filename, rdbSaveInfo *rsi) {
+int rdbLoad(char *filename, rdbSaveInfo *rsi, int rdbflags) {
FILE *fp;
rio rdb;
int retval;
if ((fp = fopen(filename,"r")) == NULL) return C_ERR;
- startLoadingFile(fp, filename);
+ startLoadingFile(fp, filename,rdbflags);
rioInitWithFile(&rdb,fp);
- retval = rdbLoadRio(&rdb,rsi,0);
+ retval = rdbLoadRio(&rdb,rdbflags,rsi);
fclose(fp);
- stopLoading();
+ stopLoading(retval==C_OK);
return retval;
}