summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMeir Shpilraien (Spielrein) <meir@redis.com>2021-12-26 09:03:37 +0200
committerGitHub <noreply@github.com>2021-12-26 09:03:37 +0200
commit365cbf46a717d660bbe9c832b6c9d7fc15982029 (patch)
treec301a721b10795620522ba5a7a2eee63aeb2992c /src
parent08ff606b0bf93e5c5e62927cc9dbd229a28ee379 (diff)
downloadredis-365cbf46a717d660bbe9c832b6c9d7fc15982029.tar.gz
Add FUNCTION DUMP and RESTORE. (#9938)
Follow the conclusions to support Functions in redis cluster (#9899) Added 2 new FUNCTION sub-commands: 1. `FUNCTION DUMP` - dump a binary payload representation of all the functions. 2. `FUNCTION RESTORE <PAYLOAD> [FLUSH|APPEND|REPLACE]` - give the binary payload extracted using `FUNCTION DUMP`, restore all the functions on the given payload. Restore policy can be given to control how to handle existing functions (default is APPEND): * FLUSH: delete all existing functions. * APPEND: appends the restored functions to the existing functions. On collision, abort. * REPLACE: appends the restored functions to the existing functions. On collision, replace the old function with the new function. Modify `redis-cli --cluster add-node` to use `FUNCTION DUMP` to get existing functions from one of the nodes in the cluster, and `FUNCTION RESTORE` to load the same set of functions to the new node. `redis-cli` will execute this step before sending the `CLUSTER MEET` command to the new node. If `FUNCTION DUMP` returns an error, assume the current Redis version do not support functions and skip `FUNCTION RESTORE`. If `FUNCTION RESTORE` fails, abort and do not send the `CLUSTER MEET` command. If the new node already contains functions (before the `FUNCTION RESTORE` is sent), abort and do not add the node to the cluster. Test was added to verify `redis-cli --cluster add-node` works as expected.
Diffstat (limited to 'src')
-rw-r--r--src/cluster.c10
-rw-r--r--src/commands.c33
-rw-r--r--src/commands/function-dump.json17
-rw-r--r--src/commands/function-restore.json46
-rw-r--r--src/functions.c166
-rw-r--r--src/rdb.c82
-rw-r--r--src/rdb.h2
-rw-r--r--src/redis-cli.c41
-rw-r--r--src/server.h3
9 files changed, 367 insertions, 33 deletions
diff --git a/src/cluster.c b/src/cluster.c
index d44f41ad8..15d2fc38f 100644
--- a/src/cluster.c
+++ b/src/cluster.c
@@ -5398,8 +5398,9 @@ void createDumpPayload(rio *payload, robj *o, robj *key, int dbid) {
/* Verify that the RDB version of the dump payload matches the one of this Redis
* instance and that the checksum is ok.
* If the DUMP payload looks valid C_OK is returned, otherwise C_ERR
- * is returned. */
-int verifyDumpPayload(unsigned char *p, size_t len) {
+ * is returned. If rdbver_ptr is not NULL, its populated with the value read
+ * from the input buffer. */
+int verifyDumpPayload(unsigned char *p, size_t len, uint16_t *rdbver_ptr) {
unsigned char *footer;
uint16_t rdbver;
uint64_t crc;
@@ -5410,6 +5411,9 @@ int verifyDumpPayload(unsigned char *p, size_t len) {
/* Verify RDB version */
rdbver = (footer[1] << 8) | footer[0];
+ if (rdbver_ptr) {
+ *rdbver_ptr = rdbver;
+ }
if (rdbver > RDB_VERSION) return C_ERR;
if (server.skip_checksum_validation)
@@ -5499,7 +5503,7 @@ void restoreCommand(client *c) {
}
/* Verify RDB version and data checksum. */
- if (verifyDumpPayload(c->argv[3]->ptr,sdslen(c->argv[3]->ptr)) == C_ERR)
+ if (verifyDumpPayload(c->argv[3]->ptr,sdslen(c->argv[3]->ptr),NULL) == C_ERR)
{
addReplyError(c,"DUMP payload version or checksum are wrong");
return;
diff --git a/src/commands.c b/src/commands.c
index 0a07fb25f..2514c51b6 100644
--- a/src/commands.c
+++ b/src/commands.c
@@ -3064,6 +3064,14 @@ struct redisCommandArg FUNCTION_DELETE_Args[] = {
{0}
};
+/********** FUNCTION DUMP ********************/
+
+/* FUNCTION DUMP history */
+#define FUNCTION_DUMP_History NULL
+
+/* FUNCTION DUMP hints */
+#define FUNCTION_DUMP_Hints NULL
+
/********** FUNCTION FLUSH ********************/
/* FUNCTION FLUSH history */
@@ -3124,6 +3132,29 @@ struct redisCommandArg FUNCTION_INFO_Args[] = {
/* FUNCTION LIST hints */
#define FUNCTION_LIST_Hints NULL
+/********** FUNCTION RESTORE ********************/
+
+/* FUNCTION RESTORE history */
+#define FUNCTION_RESTORE_History NULL
+
+/* FUNCTION RESTORE hints */
+#define FUNCTION_RESTORE_Hints NULL
+
+/* FUNCTION RESTORE policy argument table */
+struct redisCommandArg FUNCTION_RESTORE_policy_Subargs[] = {
+{"flush",ARG_TYPE_PURE_TOKEN,-1,"FLUSH",NULL,NULL,CMD_ARG_NONE},
+{"append",ARG_TYPE_PURE_TOKEN,-1,"APPEND",NULL,NULL,CMD_ARG_NONE},
+{"replace",ARG_TYPE_PURE_TOKEN,-1,"REPLACE",NULL,NULL,CMD_ARG_NONE},
+{0}
+};
+
+/* FUNCTION RESTORE argument table */
+struct redisCommandArg FUNCTION_RESTORE_Args[] = {
+{"serialized-value",ARG_TYPE_STRING,-1,NULL,NULL,NULL,CMD_ARG_NONE},
+{"policy",ARG_TYPE_ONEOF,-1,NULL,NULL,NULL,CMD_ARG_OPTIONAL,.subargs=FUNCTION_RESTORE_policy_Subargs},
+{0}
+};
+
/********** FUNCTION STATS ********************/
/* FUNCTION STATS history */
@@ -3136,11 +3167,13 @@ struct redisCommandArg FUNCTION_INFO_Args[] = {
struct redisCommand FUNCTION_Subcommands[] = {
{"create","Create a function with the given arguments (name, code, description)","O(1) (considering compilation time is redundant)","7.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SCRIPTING,FUNCTION_CREATE_History,FUNCTION_CREATE_Hints,functionCreateCommand,-5,CMD_NOSCRIPT|CMD_WRITE,ACL_CATEGORY_SCRIPTING,.args=FUNCTION_CREATE_Args},
{"delete","Delete a function by name","O(1)","7.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SCRIPTING,FUNCTION_DELETE_History,FUNCTION_DELETE_Hints,functionDeleteCommand,3,CMD_NOSCRIPT|CMD_WRITE,ACL_CATEGORY_SCRIPTING,.args=FUNCTION_DELETE_Args},
+{"dump","Dump all functions into a serialized binary payload","O(N) where N is the number of functions","7.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SCRIPTING,FUNCTION_DUMP_History,FUNCTION_DUMP_Hints,functionDumpCommand,2,CMD_NOSCRIPT,ACL_CATEGORY_SCRIPTING},
{"flush","Deleting all functions","O(N) where N is the number of functions deleted","7.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SCRIPTING,FUNCTION_FLUSH_History,FUNCTION_FLUSH_Hints,functionFlushCommand,-2,CMD_NOSCRIPT|CMD_WRITE,ACL_CATEGORY_SCRIPTING,.args=FUNCTION_FLUSH_Args},
{"help","Show helpful text about the different subcommands","O(1)","7.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SCRIPTING,FUNCTION_HELP_History,FUNCTION_HELP_Hints,functionHelpCommand,2,CMD_LOADING|CMD_STALE,ACL_CATEGORY_SCRIPTING},
{"info","Return information about a function by function name","O(1)","7.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SCRIPTING,FUNCTION_INFO_History,FUNCTION_INFO_Hints,functionInfoCommand,-3,CMD_NOSCRIPT,ACL_CATEGORY_SCRIPTING,.args=FUNCTION_INFO_Args},
{"kill","Kill the function currently in execution.","O(1)","7.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SCRIPTING,FUNCTION_KILL_History,FUNCTION_KILL_Hints,functionKillCommand,2,CMD_NOSCRIPT,ACL_CATEGORY_SCRIPTING},
{"list","List information about all the functions","O(N) where N is the number of functions","7.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SCRIPTING,FUNCTION_LIST_History,FUNCTION_LIST_Hints,functionListCommand,2,CMD_NOSCRIPT,ACL_CATEGORY_SCRIPTING},
+{"restore","Restore all the functions on the given payload","O(N) where N is the number of functions on the payload","7.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SCRIPTING,FUNCTION_RESTORE_History,FUNCTION_RESTORE_Hints,functionRestoreCommand,-3,CMD_NOSCRIPT|CMD_WRITE,ACL_CATEGORY_SCRIPTING,.args=FUNCTION_RESTORE_Args},
{"stats","Return information about the function currently running (name, description, duration)","O(1)","7.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SCRIPTING,FUNCTION_STATS_History,FUNCTION_STATS_Hints,functionStatsCommand,2,CMD_NOSCRIPT,ACL_CATEGORY_SCRIPTING},
{0}
};
diff --git a/src/commands/function-dump.json b/src/commands/function-dump.json
new file mode 100644
index 000000000..de402f589
--- /dev/null
+++ b/src/commands/function-dump.json
@@ -0,0 +1,17 @@
+{
+ "DUMP": {
+ "summary": "Dump all functions into a serialized binary payload",
+ "complexity": "O(N) where N is the number of functions",
+ "group": "scripting",
+ "since": "7.0.0",
+ "arity": 2,
+ "container": "FUNCTION",
+ "function": "functionDumpCommand",
+ "command_flags": [
+ "NOSCRIPT"
+ ],
+ "acl_categories": [
+ "SCRIPTING"
+ ]
+ }
+}
diff --git a/src/commands/function-restore.json b/src/commands/function-restore.json
new file mode 100644
index 000000000..bc9b32be4
--- /dev/null
+++ b/src/commands/function-restore.json
@@ -0,0 +1,46 @@
+{
+ "RESTORE": {
+ "summary": "Restore all the functions on the given payload",
+ "complexity": "O(N) where N is the number of functions on the payload",
+ "group": "scripting",
+ "since": "7.0.0",
+ "arity": -3,
+ "container": "FUNCTION",
+ "function": "functionRestoreCommand",
+ "command_flags": [
+ "NOSCRIPT",
+ "WRITE"
+ ],
+ "acl_categories": [
+ "SCRIPTING"
+ ],
+ "arguments": [
+ {
+ "name": "serialized-value",
+ "type": "string"
+ },
+ {
+ "name": "policy",
+ "type": "oneof",
+ "optional": true,
+ "arguments": [
+ {
+ "name": "flush",
+ "type": "pure-token",
+ "token": "FLUSH"
+ },
+ {
+ "name": "append",
+ "type": "pure-token",
+ "token": "APPEND"
+ },
+ {
+ "name": "replace",
+ "type": "pure-token",
+ "token": "REPLACE"
+ }
+ ]
+ }
+ ]
+ }
+}
diff --git a/src/functions.c b/src/functions.c
index 8da845250..2da142ecb 100644
--- a/src/functions.c
+++ b/src/functions.c
@@ -33,6 +33,10 @@
#include "adlist.h"
#include "atomicvar.h"
+typedef enum {
+ restorePolicy_Flush, restorePolicy_Append, restorePolicy_Replace
+} restorePolicy;
+
static size_t engine_cache_memory = 0;
/* Forward declaration */
@@ -81,6 +85,9 @@ static size_t functionMallocSize(functionInfo *fi) {
/* Dispose function memory */
static void engineFunctionDispose(dict *d, void *obj) {
UNUSED(d);
+ if (!obj) {
+ return;
+ }
functionInfo *fi = obj;
sdsfree(fi->code);
sdsfree(fi->name);
@@ -377,6 +384,156 @@ void fcallroCommand(client *c) {
fcallCommandGeneric(c, 1);
}
+/*
+ * FUNCTION DUMP
+ *
+ * Returns a binary payload representing all the functions.
+ * Can be loaded using FUNCTION RESTORE
+ *
+ * The payload structure is the same as on RDB. Each function
+ * is saved separately with the following information:
+ * * Function name
+ * * Engine name
+ * * Function description
+ * * Function code
+ * RDB_OPCODE_FUNCTION is saved before each function to present
+ * that the payload is a function.
+ * RDB version and crc64 is saved at the end of the payload.
+ * The RDB version is saved for backward compatibility.
+ * crc64 is saved so we can verify the payload content.
+ */
+void functionDumpCommand(client *c) {
+ unsigned char buf[2];
+ uint64_t crc;
+ rio payload;
+ rioInitWithBuffer(&payload, sdsempty());
+
+ functionsSaveRio(&payload);
+
+ /* RDB version */
+ buf[0] = RDB_VERSION & 0xff;
+ buf[1] = (RDB_VERSION >> 8) & 0xff;
+ payload.io.buffer.ptr = sdscatlen(payload.io.buffer.ptr, buf, 2);
+
+ /* CRC64 */
+ crc = crc64(0, (unsigned char*) payload.io.buffer.ptr,
+ sdslen(payload.io.buffer.ptr));
+ memrev64ifbe(&crc);
+ payload.io.buffer.ptr = sdscatlen(payload.io.buffer.ptr, &crc, 8);
+
+ addReplyBulkSds(c, payload.io.buffer.ptr);
+}
+
+/*
+ * FUNCTION RESTORE <payload> [FLUSH|APPEND|REPLACE]
+ *
+ * Restore the functions represented by the give payload.
+ * Restore policy to can be given to control how to handle existing functions (default APPEND):
+ * * FLUSH: delete all existing functions.
+ * * APPEND: appends the restored functions to the existing functions. On collision, abort.
+ * * REPLACE: appends the restored functions to the existing functions.
+ * On collision, replace the old function with the new function.
+ */
+void functionRestoreCommand(client *c) {
+ if (c->argc > 4) {
+ addReplySubcommandSyntaxError(c);
+ return;
+ }
+
+ restorePolicy restore_replicy = restorePolicy_Append; /* default policy: APPEND */
+ sds data = c->argv[2]->ptr;
+ size_t data_len = sdslen(data);
+ rio payload;
+ dictIterator *iter = NULL;
+ sds err = NULL;
+
+ if (c->argc == 4) {
+ const char *restore_policy_str = c->argv[3]->ptr;
+ if (!strcasecmp(restore_policy_str, "append")) {
+ restore_replicy = restorePolicy_Append;
+ } else if (!strcasecmp(restore_policy_str, "replace")) {
+ restore_replicy = restorePolicy_Replace;
+ } else if (!strcasecmp(restore_policy_str, "flush")) {
+ restore_replicy = restorePolicy_Flush;
+ } else {
+ addReplyError(c, "Wrong restore policy given, value should be either FLUSH, APPEND or REPLACE.");
+ return;
+ }
+ }
+
+ uint16_t rdbver;
+ if (verifyDumpPayload((unsigned char*)data, data_len, &rdbver) != C_OK) {
+ addReplyError(c, "DUMP payload version or checksum are wrong");
+ return;
+ }
+
+ functionsCtx *f_ctx = functionsCtxCreate();
+ rioInitWithBuffer(&payload, data);
+
+ /* Read until reaching last 10 bytes that should contain RDB version and checksum. */
+ while (data_len - payload.io.buffer.pos > 10) {
+ int type;
+ if ((type = rdbLoadType(&payload)) == -1) {
+ err = sdsnew("can not read data type");
+ goto load_error;
+ }
+ if (type != RDB_OPCODE_FUNCTION) {
+ err = sdsnew("given type is not a function");
+ goto load_error;
+ }
+ if (rdbFunctionLoad(&payload, rdbver, f_ctx, RDBFLAGS_NONE, &err) != C_OK) {
+ if (!err) {
+ err = sdsnew("failed loading the given functions payload");
+ }
+ goto load_error;
+ }
+ }
+
+ if (restore_replicy == restorePolicy_Flush) {
+ functionsCtxSwapWithCurrent(f_ctx);
+ f_ctx = NULL; /* avoid releasing the f_ctx in the end */
+ } else {
+ if (restore_replicy == restorePolicy_Append) {
+ /* First make sure there is only new functions */
+ iter = dictGetIterator(f_ctx->functions);
+ dictEntry *entry = NULL;
+ while ((entry = dictNext(iter))) {
+ functionInfo *fi = dictGetVal(entry);
+ if (dictFetchValue(functions_ctx->functions, fi->name)) {
+ /* function already exists, failed the restore. */
+ err = sdscatfmt(sdsempty(), "Function %s already exists", fi->name);
+ goto load_error;
+ }
+ }
+ dictReleaseIterator(iter);
+ }
+ iter = dictGetIterator(f_ctx->functions);
+ dictEntry *entry = NULL;
+ while ((entry = dictNext(iter))) {
+ functionInfo *fi = dictGetVal(entry);
+ dictReplace(functions_ctx->functions, fi->name, fi);
+ dictSetVal(f_ctx->functions, entry, NULL); /* make sure value will not be disposed */
+ }
+ }
+
+ /* Indicate that the command changed the data so it will be replicated and
+ * counted as a data change (for persistence configuration) */
+ server.dirty++;
+
+load_error:
+ if (err) {
+ addReplyErrorSds(c, err);
+ } else {
+ addReply(c, shared.ok);
+ }
+ if (iter) {
+ dictReleaseIterator(iter);
+ }
+ if (f_ctx) {
+ functionsCtxFree(f_ctx);
+ }
+}
+
void functionFlushCommand(client *c) {
if (c->argc > 3) {
addReplySubcommandSyntaxError(c);
@@ -434,6 +591,15 @@ void functionHelpCommand(client *c) {
" lazyfree-lazy-user-flush configuration directive. Valid modes are:",
" * ASYNC: Asynchronously flush the functions.",
" * SYNC: Synchronously flush the functions.",
+"DUMP",
+" Returns a serialized payload representing the current functions, can be restored using FUNCTION RESTORE command",
+"RESTORE <PAYLOAD> [FLUSH|APPEND|REPLACE]",
+" Restore the functions represented by the given payload, it is possible to give a restore policy to",
+" control how to handle existing functions (default APPEND):",
+" * FLUSH: delete all existing functions.",
+" * APPEND: appends the restored functions to the existing functions. On collision, abort.",
+" * REPLACE: appends the restored functions to the existing functions, On collision, replace the old",
+" function with the new function.",
NULL };
addReplyHelp(c, help);
}
diff --git a/src/rdb.c b/src/rdb.c
index 108804741..2126d6e6d 100644
--- a/src/rdb.c
+++ b/src/rdb.c
@@ -1214,6 +1214,30 @@ ssize_t rdbSaveSingleModuleAux(rio *rdb, int when, moduleType *mt) {
return io.bytes;
}
+int functionsSaveRio(rio *rdb) {
+ int ret = C_ERR;
+ dict *functions = functionsGet();
+ dictIterator *iter = dictGetIterator(functions);
+ dictEntry *entry = NULL;
+ while ((entry = dictNext(iter))) {
+ rdbSaveType(rdb, RDB_OPCODE_FUNCTION);
+ functionInfo *fi = dictGetVal(entry);
+ if (rdbSaveRawString(rdb, (unsigned char *) fi->name, sdslen(fi->name)) == -1) goto done;
+ if (rdbSaveRawString(rdb, (unsigned char *) fi->ei->name, sdslen(fi->ei->name)) == -1) goto done;
+ if (fi->desc) {
+ if (rdbSaveLen(rdb, 1) == -1) goto done; /* desc exists */
+ if (rdbSaveRawString(rdb, (unsigned char *) fi->desc, sdslen(fi->desc)) == -1) goto done;
+ } else {
+ if (rdbSaveLen(rdb, 0) == -1) goto done; /* desc not exists */
+ }
+ if (rdbSaveRawString(rdb, (unsigned char *) fi->code, sdslen(fi->code)) == -1) goto done;
+ }
+ ret = C_OK;
+done:
+ dictReleaseIterator(iter);
+ return ret;
+}
+
/* Produces a dump of the database in RDB format sending it to the specified
* Redis I/O channel. On success C_OK is returned, otherwise C_ERR
* is returned and part of the output, or all the output, can be
@@ -1240,24 +1264,7 @@ int rdbSaveRio(rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi) {
if (rdbSaveInfoAuxFields(rdb,rdbflags,rsi) == -1) goto werr;
if (rdbSaveModulesAux(rdb, REDISMODULE_AUX_BEFORE_RDB) == -1) goto werr;
- /* save functions */
- dict *functions = functionsGet();
- dictIterator *iter = dictGetIterator(functions);
- dictEntry *entry = NULL;
- while ((entry = dictNext(iter))) {
- rdbSaveType(rdb, RDB_OPCODE_FUNCTION);
- functionInfo* fi = dictGetVal(entry);
- if (rdbSaveRawString(rdb, (unsigned char *) fi->name, sdslen(fi->name)) == -1) goto werr;
- if (rdbSaveRawString(rdb, (unsigned char *) fi->ei->name, sdslen(fi->ei->name)) == -1) goto werr;
- if (fi->desc) {
- if (rdbSaveLen(rdb, 1) == -1) goto werr; /* desc exists */
- if (rdbSaveRawString(rdb, (unsigned char *) fi->desc, sdslen(fi->desc)) == -1) goto werr;
- } else {
- if (rdbSaveLen(rdb, 0) == -1) goto werr; /* desc not exists */
- }
- if (rdbSaveRawString(rdb, (unsigned char *) fi->code, sdslen(fi->code)) == -1) goto werr;
- }
- dictReleaseIterator(iter);
+ if (functionsSaveRio(rdb) != C_OK) goto werr;
for (j = 0; j < server.dbnum; j++) {
redisDb *db = server.db+j;
@@ -2697,42 +2704,48 @@ void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) {
}
}
-static int rdbFunctionLoad(rio *rdb, int ver, functionsCtx* functions_ctx, int rdbflags) {
+/* Save the given functions_ctx to the rdb.
+ * The err output parameter is optional and will be set with relevant error
+ * message on failure, it is the caller responsibility to free the error
+ * message on failure. */
+int rdbFunctionLoad(rio *rdb, int ver, functionsCtx* functions_ctx, int rdbflags, sds *err) {
UNUSED(ver);
sds name = NULL;
sds engine_name = NULL;
sds desc = NULL;
sds blob = NULL;
- sds err = NULL;
uint64_t has_desc;
+ sds error = NULL;
int res = C_ERR;
if (!(name = rdbGenericLoadStringObject(rdb, RDB_LOAD_SDS, NULL))) {
- serverLog(LL_WARNING, "Failed loading function name");
+ error = sdsnew("Failed loading function name");
goto error;
}
if (!(engine_name = rdbGenericLoadStringObject(rdb, RDB_LOAD_SDS, NULL))) {
- serverLog(LL_WARNING, "Failed loading engine name");
+ error = sdsnew("Failed loading engine name");
goto error;
}
if ((has_desc = rdbLoadLen(rdb, NULL)) == RDB_LENERR) {
- serverLog(LL_WARNING, "Failed loading function desc indicator");
+ error = sdsnew("Failed loading function description indicator");
goto error;
}
if (has_desc && !(desc = rdbGenericLoadStringObject(rdb, RDB_LOAD_SDS, NULL))) {
- serverLog(LL_WARNING, "Failed loading function desc");
+ error = sdsnew("Failed loading function description");
goto error;
}
if (!(blob = rdbGenericLoadStringObject(rdb, RDB_LOAD_SDS, NULL))) {
- serverLog(LL_WARNING, "Failed loading function blob");
+ error = sdsnew("Failed loading function blob");
goto error;
}
- if (functionsCreateWithFunctionCtx(name, engine_name, desc, blob, rdbflags & RDBFLAGS_ALLOW_DUP, &err, functions_ctx) != C_OK) {
- serverLog(LL_WARNING, "Failed compiling and saving the function %s", err);
+ if (functionsCreateWithFunctionCtx(name, engine_name, desc, blob, rdbflags & RDBFLAGS_ALLOW_DUP, &error, functions_ctx) != C_OK) {
+ if (!error) {
+ error = sdsnew("Failed creating the function");
+ }
goto error;
}
@@ -2743,7 +2756,14 @@ error:
if (engine_name) sdsfree(engine_name);
if (desc) sdsfree(desc);
if (blob) sdsfree(blob);
- if (err) sdsfree(err);
+ if (error) {
+ if (err) {
+ *err = error;
+ } else {
+ serverLog(LL_WARNING, "Failed creating function, %s", error);
+ sdsfree(error);
+ }
+ }
return res;
}
@@ -2964,8 +2984,10 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin
continue; /* Read next opcode. */
}
} else if (type == RDB_OPCODE_FUNCTION) {
- if (rdbFunctionLoad(rdb, rdbver, rdb_loading_ctx->functions_ctx, rdbflags) != C_OK) {
- serverLog(LL_WARNING,"Failed loading function");
+ sds err = NULL;
+ if (rdbFunctionLoad(rdb, rdbver, rdb_loading_ctx->functions_ctx, rdbflags, &err) != C_OK) {
+ serverLog(LL_WARNING,"Failed loading function, %s", err);
+ sdsfree(err);
goto eoferr;
}
continue;
diff --git a/src/rdb.h b/src/rdb.h
index 66496bcec..5942c4333 100644
--- a/src/rdb.h
+++ b/src/rdb.h
@@ -169,7 +169,9 @@ int rdbSaveBinaryFloatValue(rio *rdb, float val);
int rdbLoadBinaryFloatValue(rio *rdb, float *val);
int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi);
int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadingCtx *rdb_loading_ctx);
+int rdbFunctionLoad(rio *rdb, int ver, functionsCtx* functions_ctx, int rdbflags, sds *err);
int rdbSaveRio(rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi);
+int functionsSaveRio(rio *rdb);
rdbSaveInfo *rdbPopulateSaveInfo(rdbSaveInfo *rsi);
#endif
diff --git a/src/redis-cli.c b/src/redis-cli.c
index 81e4f4883..24e8fe562 100644
--- a/src/redis-cli.c
+++ b/src/redis-cli.c
@@ -5880,6 +5880,8 @@ cleanup:
static int clusterManagerCommandAddNode(int argc, char **argv) {
int success = 1;
redisReply *reply = NULL;
+ redisReply *function_restore_reply = NULL;
+ redisReply *function_list_reply = NULL;
char *ref_ip = NULL, *ip = NULL;
int ref_port = 0, port = 0;
if (!getClusterHostFromCmdArgs(argc - 1, argv + 1, &ref_ip, &ref_port))
@@ -5944,6 +5946,43 @@ static int clusterManagerCommandAddNode(int argc, char **argv) {
listAddNodeTail(cluster_manager.nodes, new_node);
added = 1;
+ if (!master_node) {
+ /* Send functions to the new node, if new node is a replica it will get the functions from its primary. */
+ clusterManagerLogInfo(">>> Getting functions from cluster\n");
+ reply = CLUSTER_MANAGER_COMMAND(refnode, "FUNCTION DUMP");
+ if (!clusterManagerCheckRedisReply(refnode, reply, &err)) {
+ clusterManagerLogInfo(">>> Failed retrieving Functions from the cluster, "
+ "skip this step as Redis version do not support function command (error = '%s')\n", err? err : "NULL reply");
+ if (err) zfree(err);
+ } else {
+ assert(reply->type == REDIS_REPLY_STRING);
+ clusterManagerLogInfo(">>> Send FUNCTION LIST to %s:%d to verify there is no functions in it\n", ip, port);
+ function_list_reply = CLUSTER_MANAGER_COMMAND(new_node, "FUNCTION LIST");
+ if (!clusterManagerCheckRedisReply(new_node, function_list_reply, &err)) {
+ clusterManagerLogErr(">>> Failed on CLUSTER LIST (error = '%s')\r\n", err? err : "NULL reply");
+ if (err) zfree(err);
+ success = 0;
+ goto cleanup;
+ }
+ assert(function_list_reply->type == REDIS_REPLY_ARRAY);
+ if (function_list_reply->elements > 0) {
+ clusterManagerLogErr(">>> New node already contains functions and can not be added to the cluster. Use FUNCTION FLUSH and try again.\r\n");
+ success = 0;
+ goto cleanup;
+ }
+ clusterManagerLogInfo(">>> Send FUNCTION RESTORE to %s:%d\n", ip, port);
+ function_restore_reply = CLUSTER_MANAGER_COMMAND(new_node, "FUNCTION RESTORE %b", reply->str, reply->len);
+ if (!clusterManagerCheckRedisReply(new_node, function_restore_reply, &err)) {
+ clusterManagerLogErr(">>> Failed loading functions to the new node (error = '%s')\r\n", err? err : "NULL reply");
+ if (err) zfree(err);
+ success = 0;
+ goto cleanup;
+ }
+ }
+ }
+
+ if (reply) freeReplyObject(reply);
+
// Send CLUSTER MEET command to the new node
clusterManagerLogInfo(">>> Send CLUSTER MEET to node %s:%d to make it "
"join the cluster.\n", ip, port);
@@ -5968,6 +6007,8 @@ static int clusterManagerCommandAddNode(int argc, char **argv) {
cleanup:
if (!added && new_node) freeClusterManagerNode(new_node);
if (reply) freeReplyObject(reply);
+ if (function_restore_reply) freeReplyObject(function_restore_reply);
+ if (function_list_reply) freeReplyObject(function_list_reply);
return success;
invalid_args:
fprintf(stderr, CLUSTER_MANAGER_INVALID_HOST_ARG);
diff --git a/src/server.h b/src/server.h
index 8e4d8d902..471e8c72a 100644
--- a/src/server.h
+++ b/src/server.h
@@ -3162,6 +3162,7 @@ void migrateCommand(client *c);
void askingCommand(client *c);
void readonlyCommand(client *c);
void readwriteCommand(client *c);
+int verifyDumpPayload(unsigned char *p, size_t len, uint16_t *rdbver_ptr);
void dumpCommand(client *c);
void objectCommand(client *c);
void memoryCommand(client *c);
@@ -3182,6 +3183,8 @@ void functionInfoCommand(client *c);
void functionListCommand(client *c);
void functionHelpCommand(client *c);
void functionFlushCommand(client *c);
+void functionRestoreCommand(client *c);
+void functionDumpCommand(client *c);
void timeCommand(client *c);
void bitopCommand(client *c);
void bitcountCommand(client *c);