summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorOzan Tezcan <ozantezcan@gmail.com>2023-04-09 12:07:32 +0300
committerGitHub <noreply@github.com>2023-04-09 12:07:32 +0300
commite55568edb58c1437d6a84e1baa103dcf5b6153ef (patch)
tree04377f49bbdbba0be4c6e5514a0a274ec8b05d89 /src
parentf263b6daf3a2672acc383dc34ed1ff1fe19da5a7 (diff)
downloadredis-e55568edb58c1437d6a84e1baa103dcf5b6153ef.tar.gz
Add RM_RdbLoad and RM_RdbSave module API functions (#11852)
Add `RM_RdbLoad()` and `RM_RdbSave()` to load/save RDB files from the module API. In our use case, we have our clustering implementation as a module. As part of this implementation, the module needs to trigger RDB save operation at specific points. Also, this module delivers RDB files to other nodes (not using Redis' replication). When a node receives an RDB file, it should be able to load the RDB. Currently, there is no module API to save/load RDB files. This PR adds four new APIs: ```c RedisModuleRdbStream *RM_RdbStreamCreateFromFile(const char *filename); void RM_RdbStreamFree(RedisModuleRdbStream *stream); int RM_RdbLoad(RedisModuleCtx *ctx, RedisModuleRdbStream *stream, int flags); int RM_RdbSave(RedisModuleCtx *ctx, RedisModuleRdbStream *stream, int flags); ``` The first step is to create a `RedisModuleRdbStream` object. This PR provides a function to create RedisModuleRdbStream from the filename. (You can load/save RDB with the filename). In the future, this API can be extended if needed: e.g., `RM_RdbStreamCreateFromFd()`, `RM_RdbStreamCreateFromSocket()` to save/load RDB from an `fd` or a `socket`. Usage: ```c /* Save RDB */ RedisModuleRdbStream *stream = RedisModule_RdbStreamCreateFromFile("example.rdb"); RedisModule_RdbSave(ctx, stream, 0); RedisModule_RdbStreamFree(stream); /* Load RDB */ RedisModuleRdbStream *stream = RedisModule_RdbStreamCreateFromFile("example.rdb"); RedisModule_RdbLoad(ctx, stream, 0); RedisModule_RdbStreamFree(stream); ```
Diffstat (limited to 'src')
-rw-r--r--src/module.c135
-rw-r--r--src/rdb.c71
-rw-r--r--src/rdb.h1
-rw-r--r--src/redismodule.h9
4 files changed, 198 insertions, 18 deletions
diff --git a/src/module.c b/src/module.c
index 066786976..ae75b3dc2 100644
--- a/src/module.c
+++ b/src/module.c
@@ -12751,6 +12751,137 @@ int RM_LoadConfigs(RedisModuleCtx *ctx) {
return REDISMODULE_OK;
}
+/* --------------------------------------------------------------------------
+ * ## RDB load/save API
+ * -------------------------------------------------------------------------- */
+
+#define REDISMODULE_RDB_STREAM_FILE 1
+
+typedef struct RedisModuleRdbStream {
+ int type;
+
+ union {
+ char *filename;
+ } data;
+} RedisModuleRdbStream;
+
+/* Create a stream object to save/load RDB to/from a file.
+ *
+ * This function returns a pointer to RedisModuleRdbStream which is owned
+ * by the caller. It requires a call to RM_RdbStreamFree() to free
+ * the object. */
+RedisModuleRdbStream *RM_RdbStreamCreateFromFile(const char *filename) {
+ RedisModuleRdbStream *stream = zmalloc(sizeof(*stream));
+ stream->type = REDISMODULE_RDB_STREAM_FILE;
+ stream->data.filename = zstrdup(filename);
+ return stream;
+}
+
+/* Release an RDB stream object. */
+void RM_RdbStreamFree(RedisModuleRdbStream *stream) {
+ switch (stream->type) {
+ case REDISMODULE_RDB_STREAM_FILE:
+ zfree(stream->data.filename);
+ break;
+ default:
+ serverAssert(0);
+ break;
+ }
+ zfree(stream);
+}
+
+/* Load RDB file from the `stream`. Dataset will be cleared first and then RDB
+ * file will be loaded.
+ *
+ * `flags` must be zero. This parameter is for future use.
+ *
+ * On success REDISMODULE_OK is returned, otherwise REDISMODULE_ERR is returned
+ * and errno is set accordingly.
+ *
+ * Example:
+ *
+ * RedisModuleRdbStream *s = RedisModule_RdbStreamCreateFromFile("exp.rdb");
+ * RedisModule_RdbLoad(ctx, s, 0);
+ * RedisModule_RdbStreamFree(s);
+ */
+int RM_RdbLoad(RedisModuleCtx *ctx, RedisModuleRdbStream *stream, int flags) {
+ UNUSED(ctx);
+
+ if (!stream || flags != 0) {
+ errno = EINVAL;
+ return REDISMODULE_ERR;
+ }
+
+ /* Not allowed on replicas. */
+ if (server.masterhost != NULL) {
+ errno = ENOTSUP;
+ return REDISMODULE_ERR;
+ }
+
+ /* Drop replicas if exist. */
+ disconnectSlaves();
+ freeReplicationBacklog();
+
+ if (server.aof_state != AOF_OFF) stopAppendOnly();
+
+ /* Kill existing RDB fork as it is saving outdated data. Also killing it
+ * will prevent COW memory issue. */
+ if (server.child_type == CHILD_TYPE_RDB) killRDBChild();
+
+ emptyData(-1,EMPTYDB_NO_FLAGS,NULL);
+
+ /* rdbLoad() can go back to the networking and process network events. If
+ * RM_RdbLoad() is called inside a command callback, we don't want to
+ * process the current client. Otherwise, we may free the client or try to
+ * process next message while we are already in the command callback. */
+ if (server.current_client) protectClient(server.current_client);
+
+ serverAssert(stream->type == REDISMODULE_RDB_STREAM_FILE);
+ int ret = rdbLoad(stream->data.filename,NULL,RDBFLAGS_NONE);
+
+ if (server.current_client) unprotectClient(server.current_client);
+ if (server.aof_state != AOF_OFF) startAppendOnly();
+
+ if (ret != RDB_OK) {
+ errno = (ret == RDB_NOT_EXIST) ? ENOENT : EIO;
+ return REDISMODULE_ERR;
+ }
+
+ errno = 0;
+ return REDISMODULE_OK;
+}
+
+/* Save dataset to the RDB stream.
+ *
+ * `flags` must be zero. This parameter is for future use.
+ *
+ * On success REDISMODULE_OK is returned, otherwise REDISMODULE_ERR is returned
+ * and errno is set accordingly.
+ *
+ * Example:
+ *
+ * RedisModuleRdbStream *s = RedisModule_RdbStreamCreateFromFile("exp.rdb");
+ * RedisModule_RdbSave(ctx, s, 0);
+ * RedisModule_RdbStreamFree(s);
+ */
+int RM_RdbSave(RedisModuleCtx *ctx, RedisModuleRdbStream *stream, int flags) {
+ UNUSED(ctx);
+
+ if (!stream || flags != 0) {
+ errno = EINVAL;
+ return REDISMODULE_ERR;
+ }
+
+ serverAssert(stream->type == REDISMODULE_RDB_STREAM_FILE);
+
+ if (rdbSaveToFile(stream->data.filename) != C_OK) {
+ return REDISMODULE_ERR;
+ }
+
+ errno = 0;
+ return REDISMODULE_OK;
+}
+
/* Redis MODULE command.
*
* MODULE LIST
@@ -13627,4 +13758,8 @@ void moduleRegisterCoreAPI(void) {
REGISTER_API(RegisterEnumConfig);
REGISTER_API(LoadConfigs);
REGISTER_API(RegisterAuthCallback);
+ REGISTER_API(RdbStreamCreateFromFile);
+ REGISTER_API(RdbStreamFree);
+ REGISTER_API(RdbLoad);
+ REGISTER_API(RdbSave);
}
diff --git a/src/rdb.c b/src/rdb.c
index a2be21ccb..6ebc0751e 100644
--- a/src/rdb.c
+++ b/src/rdb.c
@@ -1437,31 +1437,29 @@ werr: /* Write error. */
return C_ERR;
}
-/* Save the DB on disk. Return C_ERR on error, C_OK on success. */
-int rdbSave(int req, char *filename, rdbSaveInfo *rsi, int rdbflags) {
- char tmpfile[256];
+static int rdbSaveInternal(int req, const char *filename, rdbSaveInfo *rsi, int rdbflags) {
char cwd[MAXPATHLEN]; /* Current working dir path for error messages. */
- FILE *fp = NULL;
rio rdb;
int error = 0;
+ int saved_errno;
char *err_op; /* For a detailed log */
- snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
- fp = fopen(tmpfile,"w");
+ FILE *fp = fopen(filename,"w");
if (!fp) {
+ saved_errno = errno;
char *str_err = strerror(errno);
char *cwdp = getcwd(cwd,MAXPATHLEN);
serverLog(LL_WARNING,
"Failed opening the temp RDB file %s (in server root dir %s) "
"for saving: %s",
- tmpfile,
+ filename,
cwdp ? cwdp : "unknown",
str_err);
+ errno = saved_errno;
return C_ERR;
}
rioInitWithFile(&rdb,fp);
- startSaving(RDBFLAGS_NONE);
if (server.rdb_save_incremental_fsync) {
rioSetAutoSync(&rdb,REDIS_AUTOSYNC_BYTES);
@@ -1481,7 +1479,46 @@ int rdbSave(int req, char *filename, rdbSaveInfo *rsi, int rdbflags) {
serverLog(LL_NOTICE,"Unable to reclaim cache after saving RDB: %s", strerror(errno));
}
if (fclose(fp)) { fp = NULL; err_op = "fclose"; goto werr; }
- fp = NULL;
+
+ return C_OK;
+
+werr:
+ saved_errno = errno;
+ serverLog(LL_WARNING,"Write error while saving DB to the disk(%s): %s", err_op, strerror(errno));
+ if (fp) fclose(fp);
+ unlink(filename);
+ errno = saved_errno;
+ return C_ERR;
+}
+
+/* Save DB to the file. Similar to rdbSave() but this function won't use a
+ * temporary file and won't update the metrics. */
+int rdbSaveToFile(const char *filename) {
+ startSaving(RDBFLAGS_NONE);
+
+ if (rdbSaveInternal(SLAVE_REQ_NONE,filename,NULL,RDBFLAGS_NONE) != C_OK) {
+ int saved_errno = errno;
+ stopSaving(0);
+ errno = saved_errno;
+ return C_ERR;
+ }
+
+ stopSaving(1);
+ return C_OK;
+}
+
+/* Save the DB on disk. Return C_ERR on error, C_OK on success. */
+int rdbSave(int req, char *filename, rdbSaveInfo *rsi, int rdbflags) {
+ char tmpfile[256];
+ char cwd[MAXPATHLEN]; /* Current working dir path for error messages. */
+
+ startSaving(RDBFLAGS_NONE);
+ snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
+
+ if (rdbSaveInternal(req,tmpfile,rsi,rdbflags) != C_OK) {
+ stopSaving(0);
+ return C_ERR;
+ }
/* Use RENAME to make sure the DB file is changed atomically only
* if the generate DB file is ok. */
@@ -1499,7 +1536,12 @@ int rdbSave(int req, char *filename, rdbSaveInfo *rsi, int rdbflags) {
stopSaving(0);
return C_ERR;
}
- if (fsyncFileDir(filename) == -1) { err_op = "fsyncFileDir"; goto werr; }
+ if (fsyncFileDir(filename) != 0) {
+ serverLog(LL_WARNING,
+ "Failed to fsync directory while saving DB: %s", strerror(errno));
+ stopSaving(0);
+ return C_ERR;
+ }
serverLog(LL_NOTICE,"DB saved on disk");
server.dirty = 0;
@@ -1507,13 +1549,6 @@ int rdbSave(int req, char *filename, rdbSaveInfo *rsi, int rdbflags) {
server.lastbgsave_status = C_OK;
stopSaving(1);
return C_OK;
-
-werr:
- serverLog(LL_WARNING,"Write error saving DB on disk(%s): %s", err_op, strerror(errno));
- if (fp) fclose(fp);
- unlink(tmpfile);
- stopSaving(0);
- return C_ERR;
}
int rdbSaveBackground(int req, char *filename, rdbSaveInfo *rsi, int rdbflags) {
@@ -3361,7 +3396,7 @@ int rdbLoad(char *filename, rdbSaveInfo *rsi, int rdbflags) {
/* Reclaim the cache backed by rdb */
if (retval == C_OK && !(rdbflags & RDBFLAGS_KEEP_CACHE)) {
/* TODO: maybe we could combine the fopen and open into one in the future */
- rdb_fd = open(server.rdb_filename, O_RDONLY);
+ rdb_fd = open(filename, O_RDONLY);
if (rdb_fd > 0) bioCreateCloseJob(rdb_fd, 0, 1);
}
return (retval==C_OK) ? RDB_OK : RDB_FAILED;
diff --git a/src/rdb.h b/src/rdb.h
index dc096fd8b..234bde221 100644
--- a/src/rdb.h
+++ b/src/rdb.h
@@ -157,6 +157,7 @@ int rdbLoad(char *filename, rdbSaveInfo *rsi, int rdbflags);
int rdbSaveBackground(int req, char *filename, rdbSaveInfo *rsi, int rdbflags);
int rdbSaveToSlavesSockets(int req, rdbSaveInfo *rsi);
void rdbRemoveTempFile(pid_t childpid, int from_signal);
+int rdbSaveToFile(const char *filename);
int rdbSave(int req, char *filename, rdbSaveInfo *rsi, int rdbflags);
ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key, int dbid);
size_t rdbSavedObjectLen(robj *o, robj *key, int dbid);
diff --git a/src/redismodule.h b/src/redismodule.h
index 13ebc3829..f58da2f49 100644
--- a/src/redismodule.h
+++ b/src/redismodule.h
@@ -881,6 +881,7 @@ typedef struct RedisModuleServerInfoData RedisModuleServerInfoData;
typedef struct RedisModuleScanCursor RedisModuleScanCursor;
typedef struct RedisModuleUser RedisModuleUser;
typedef struct RedisModuleKeyOptCtx RedisModuleKeyOptCtx;
+typedef struct RedisModuleRdbStream RedisModuleRdbStream;
typedef int (*RedisModuleCmdFunc)(RedisModuleCtx *ctx, RedisModuleString **argv, int argc);
typedef void (*RedisModuleDisconnectFunc)(RedisModuleCtx *ctx, RedisModuleBlockedClient *bc);
@@ -1303,6 +1304,10 @@ REDISMODULE_API int (*RedisModule_RegisterNumericConfig)(RedisModuleCtx *ctx, co
REDISMODULE_API int (*RedisModule_RegisterStringConfig)(RedisModuleCtx *ctx, const char *name, const char *default_val, unsigned int flags, RedisModuleConfigGetStringFunc getfn, RedisModuleConfigSetStringFunc setfn, RedisModuleConfigApplyFunc applyfn, void *privdata) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_RegisterEnumConfig)(RedisModuleCtx *ctx, const char *name, int default_val, unsigned int flags, const char **enum_values, const int *int_values, int num_enum_vals, RedisModuleConfigGetEnumFunc getfn, RedisModuleConfigSetEnumFunc setfn, RedisModuleConfigApplyFunc applyfn, void *privdata) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_LoadConfigs)(RedisModuleCtx *ctx) REDISMODULE_ATTR;
+REDISMODULE_API RedisModuleRdbStream *(*RedisModule_RdbStreamCreateFromFile)(const char *filename) REDISMODULE_ATTR;
+REDISMODULE_API void (*RedisModule_RdbStreamFree)(RedisModuleRdbStream *stream) REDISMODULE_ATTR;
+REDISMODULE_API int (*RedisModule_RdbLoad)(RedisModuleCtx *ctx, RedisModuleRdbStream *stream, int flags) REDISMODULE_ATTR;
+REDISMODULE_API int (*RedisModule_RdbSave)(RedisModuleCtx *ctx, RedisModuleRdbStream *stream, int flags) REDISMODULE_ATTR;
#define RedisModule_IsAOFClient(id) ((id) == UINT64_MAX)
@@ -1658,6 +1663,10 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int
REDISMODULE_GET_API(RegisterStringConfig);
REDISMODULE_GET_API(RegisterEnumConfig);
REDISMODULE_GET_API(LoadConfigs);
+ REDISMODULE_GET_API(RdbStreamCreateFromFile);
+ REDISMODULE_GET_API(RdbStreamFree);
+ REDISMODULE_GET_API(RdbLoad);
+ REDISMODULE_GET_API(RdbSave);
if (RedisModule_IsModuleNameBusy && RedisModule_IsModuleNameBusy(name)) return REDISMODULE_ERR;
RedisModule_SetModuleAttribs(ctx,name,ver,apiver);