diff options
author | Meir Shpilraien (Spielrein) <meir@redis.com> | 2021-12-26 09:03:37 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-12-26 09:03:37 +0200 |
commit | 365cbf46a717d660bbe9c832b6c9d7fc15982029 (patch) | |
tree | c301a721b10795620522ba5a7a2eee63aeb2992c /src/rdb.c | |
parent | 08ff606b0bf93e5c5e62927cc9dbd229a28ee379 (diff) | |
download | redis-365cbf46a717d660bbe9c832b6c9d7fc15982029.tar.gz |
Add FUNCTION DUMP and RESTORE. (#9938)
Follow the conclusions to support Functions in redis cluster (#9899)
Added 2 new FUNCTION sub-commands:
1. `FUNCTION DUMP` - dump a binary payload representation of all the functions.
2. `FUNCTION RESTORE <PAYLOAD> [FLUSH|APPEND|REPLACE]` - give the binary payload extracted
using `FUNCTION DUMP`, restore all the functions on the given payload. Restore policy can be given to
control how to handle existing functions (default is APPEND):
* FLUSH: delete all existing functions.
* APPEND: appends the restored functions to the existing functions. On collision, abort.
* REPLACE: appends the restored functions to the existing functions. On collision,
replace the old function with the new function.
Modify `redis-cli --cluster add-node` to use `FUNCTION DUMP` to get existing functions from
one of the nodes in the cluster, and `FUNCTION RESTORE` to load the same set of functions
to the new node. `redis-cli` will execute this step before sending the `CLUSTER MEET` command
to the new node. If `FUNCTION DUMP` returns an error, assume the current Redis version do not
support functions and skip `FUNCTION RESTORE`. If `FUNCTION RESTORE` fails, abort and do not send
the `CLUSTER MEET` command. If the new node already contains functions (before the `FUNCTION RESTORE`
is sent), abort and do not add the node to the cluster. Test was added to verify
`redis-cli --cluster add-node` works as expected.
Diffstat (limited to 'src/rdb.c')
-rw-r--r-- | src/rdb.c | 82 |
1 files changed, 52 insertions, 30 deletions
@@ -1214,6 +1214,30 @@ ssize_t rdbSaveSingleModuleAux(rio *rdb, int when, moduleType *mt) { return io.bytes; } +int functionsSaveRio(rio *rdb) { + int ret = C_ERR; + dict *functions = functionsGet(); + dictIterator *iter = dictGetIterator(functions); + dictEntry *entry = NULL; + while ((entry = dictNext(iter))) { + rdbSaveType(rdb, RDB_OPCODE_FUNCTION); + functionInfo *fi = dictGetVal(entry); + if (rdbSaveRawString(rdb, (unsigned char *) fi->name, sdslen(fi->name)) == -1) goto done; + if (rdbSaveRawString(rdb, (unsigned char *) fi->ei->name, sdslen(fi->ei->name)) == -1) goto done; + if (fi->desc) { + if (rdbSaveLen(rdb, 1) == -1) goto done; /* desc exists */ + if (rdbSaveRawString(rdb, (unsigned char *) fi->desc, sdslen(fi->desc)) == -1) goto done; + } else { + if (rdbSaveLen(rdb, 0) == -1) goto done; /* desc not exists */ + } + if (rdbSaveRawString(rdb, (unsigned char *) fi->code, sdslen(fi->code)) == -1) goto done; + } + ret = C_OK; +done: + dictReleaseIterator(iter); + return ret; +} + /* Produces a dump of the database in RDB format sending it to the specified * Redis I/O channel. On success C_OK is returned, otherwise C_ERR * is returned and part of the output, or all the output, can be @@ -1240,24 +1264,7 @@ int rdbSaveRio(rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi) { if (rdbSaveInfoAuxFields(rdb,rdbflags,rsi) == -1) goto werr; if (rdbSaveModulesAux(rdb, REDISMODULE_AUX_BEFORE_RDB) == -1) goto werr; - /* save functions */ - dict *functions = functionsGet(); - dictIterator *iter = dictGetIterator(functions); - dictEntry *entry = NULL; - while ((entry = dictNext(iter))) { - rdbSaveType(rdb, RDB_OPCODE_FUNCTION); - functionInfo* fi = dictGetVal(entry); - if (rdbSaveRawString(rdb, (unsigned char *) fi->name, sdslen(fi->name)) == -1) goto werr; - if (rdbSaveRawString(rdb, (unsigned char *) fi->ei->name, sdslen(fi->ei->name)) == -1) goto werr; - if (fi->desc) { - if (rdbSaveLen(rdb, 1) == -1) goto werr; /* desc exists */ - if (rdbSaveRawString(rdb, (unsigned char *) fi->desc, sdslen(fi->desc)) == -1) goto werr; - } else { - if (rdbSaveLen(rdb, 0) == -1) goto werr; /* desc not exists */ - } - if (rdbSaveRawString(rdb, (unsigned char *) fi->code, sdslen(fi->code)) == -1) goto werr; - } - dictReleaseIterator(iter); + if (functionsSaveRio(rdb) != C_OK) goto werr; for (j = 0; j < server.dbnum; j++) { redisDb *db = server.db+j; @@ -2697,42 +2704,48 @@ void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) { } } -static int rdbFunctionLoad(rio *rdb, int ver, functionsCtx* functions_ctx, int rdbflags) { +/* Save the given functions_ctx to the rdb. + * The err output parameter is optional and will be set with relevant error + * message on failure, it is the caller responsibility to free the error + * message on failure. */ +int rdbFunctionLoad(rio *rdb, int ver, functionsCtx* functions_ctx, int rdbflags, sds *err) { UNUSED(ver); sds name = NULL; sds engine_name = NULL; sds desc = NULL; sds blob = NULL; - sds err = NULL; uint64_t has_desc; + sds error = NULL; int res = C_ERR; if (!(name = rdbGenericLoadStringObject(rdb, RDB_LOAD_SDS, NULL))) { - serverLog(LL_WARNING, "Failed loading function name"); + error = sdsnew("Failed loading function name"); goto error; } if (!(engine_name = rdbGenericLoadStringObject(rdb, RDB_LOAD_SDS, NULL))) { - serverLog(LL_WARNING, "Failed loading engine name"); + error = sdsnew("Failed loading engine name"); goto error; } if ((has_desc = rdbLoadLen(rdb, NULL)) == RDB_LENERR) { - serverLog(LL_WARNING, "Failed loading function desc indicator"); + error = sdsnew("Failed loading function description indicator"); goto error; } if (has_desc && !(desc = rdbGenericLoadStringObject(rdb, RDB_LOAD_SDS, NULL))) { - serverLog(LL_WARNING, "Failed loading function desc"); + error = sdsnew("Failed loading function description"); goto error; } if (!(blob = rdbGenericLoadStringObject(rdb, RDB_LOAD_SDS, NULL))) { - serverLog(LL_WARNING, "Failed loading function blob"); + error = sdsnew("Failed loading function blob"); goto error; } - if (functionsCreateWithFunctionCtx(name, engine_name, desc, blob, rdbflags & RDBFLAGS_ALLOW_DUP, &err, functions_ctx) != C_OK) { - serverLog(LL_WARNING, "Failed compiling and saving the function %s", err); + if (functionsCreateWithFunctionCtx(name, engine_name, desc, blob, rdbflags & RDBFLAGS_ALLOW_DUP, &error, functions_ctx) != C_OK) { + if (!error) { + error = sdsnew("Failed creating the function"); + } goto error; } @@ -2743,7 +2756,14 @@ error: if (engine_name) sdsfree(engine_name); if (desc) sdsfree(desc); if (blob) sdsfree(blob); - if (err) sdsfree(err); + if (error) { + if (err) { + *err = error; + } else { + serverLog(LL_WARNING, "Failed creating function, %s", error); + sdsfree(error); + } + } return res; } @@ -2964,8 +2984,10 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin continue; /* Read next opcode. */ } } else if (type == RDB_OPCODE_FUNCTION) { - if (rdbFunctionLoad(rdb, rdbver, rdb_loading_ctx->functions_ctx, rdbflags) != C_OK) { - serverLog(LL_WARNING,"Failed loading function"); + sds err = NULL; + if (rdbFunctionLoad(rdb, rdbver, rdb_loading_ctx->functions_ctx, rdbflags, &err) != C_OK) { + serverLog(LL_WARNING,"Failed loading function, %s", err); + sdsfree(err); goto eoferr; } continue; |