summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/commands.c22
-rw-r--r--src/commands/function-flush.json37
-rw-r--r--src/functions.c40
-rw-r--r--src/functions.h1
-rw-r--r--src/lazyfree.c20
-rw-r--r--src/server.h2
-rw-r--r--tests/support/server.tcl1
-rw-r--r--tests/test_helper.tcl1
-rw-r--r--tests/unit/functions.tcl42
9 files changed, 165 insertions, 1 deletions
diff --git a/src/commands.c b/src/commands.c
index 252b009c1..c88072072 100644
--- a/src/commands.c
+++ b/src/commands.c
@@ -3055,6 +3055,27 @@ struct redisCommandArg FUNCTION_DELETE_Args[] = {
{0}
};
+/********** FUNCTION FLUSH ********************/
+
+/* FUNCTION FLUSH history */
+#define FUNCTION_FLUSH_History NULL
+
+/* FUNCTION FLUSH hints */
+#define FUNCTION_FLUSH_Hints NULL
+
+/* FUNCTION FLUSH async argument table */
+struct redisCommandArg FUNCTION_FLUSH_async_Subargs[] = {
+{"async",ARG_TYPE_PURE_TOKEN,-1,"ASYNC",NULL,NULL,CMD_ARG_NONE},
+{"sync",ARG_TYPE_PURE_TOKEN,-1,"SYNC",NULL,NULL,CMD_ARG_NONE},
+{0}
+};
+
+/* FUNCTION FLUSH argument table */
+struct redisCommandArg FUNCTION_FLUSH_Args[] = {
+{"async",ARG_TYPE_ONEOF,-1,NULL,NULL,NULL,CMD_ARG_OPTIONAL,.subargs=FUNCTION_FLUSH_async_Subargs},
+{0}
+};
+
/********** FUNCTION HELP ********************/
/* FUNCTION HELP history */
@@ -3106,6 +3127,7 @@ struct redisCommandArg FUNCTION_INFO_Args[] = {
struct redisCommand FUNCTION_Subcommands[] = {
{"create","PATCH__TBD__15__","PATCH__TBD__14__","7.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SCRIPTING,FUNCTION_CREATE_History,FUNCTION_CREATE_Hints,functionCreateCommand,-5,CMD_NOSCRIPT|CMD_MAY_REPLICATE,ACL_CATEGORY_SCRIPTING,.args=FUNCTION_CREATE_Args},
{"delete","PATCH__TBD__23__","PATCH__TBD__22__","7.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SCRIPTING,FUNCTION_DELETE_History,FUNCTION_DELETE_Hints,functionDeleteCommand,3,CMD_NOSCRIPT|CMD_MAY_REPLICATE,ACL_CATEGORY_SCRIPTING,.args=FUNCTION_DELETE_Args},
+{"flush","PATCH__TBD__29__","PATCH__TBD__28__","7.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SCRIPTING,FUNCTION_FLUSH_History,FUNCTION_FLUSH_Hints,functionFlushCommand,-2,CMD_NOSCRIPT|CMD_MAY_REPLICATE,ACL_CATEGORY_SCRIPTING,.args=FUNCTION_FLUSH_Args},
{"help","Show helpful text about the different subcommands","O(1)","7.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SCRIPTING,FUNCTION_HELP_History,FUNCTION_HELP_Hints,functionHelpCommand,2,CMD_LOADING|CMD_STALE,ACL_CATEGORY_SCRIPTING},
{"info","PATCH__TBD__11__","PATCH__TBD__10__","7.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SCRIPTING,FUNCTION_INFO_History,FUNCTION_INFO_Hints,functionInfoCommand,-3,CMD_NOSCRIPT,ACL_CATEGORY_SCRIPTING,.args=FUNCTION_INFO_Args},
{"kill","PATCH__TBD__19__","PATCH__TBD__18__","7.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SCRIPTING,FUNCTION_KILL_History,FUNCTION_KILL_Hints,functionKillCommand,2,CMD_NOSCRIPT,ACL_CATEGORY_SCRIPTING},
diff --git a/src/commands/function-flush.json b/src/commands/function-flush.json
new file mode 100644
index 000000000..792f518e6
--- /dev/null
+++ b/src/commands/function-flush.json
@@ -0,0 +1,37 @@
+{
+ "FLUSH": {
+ "summary": "PATCH__TBD__29__",
+ "complexity": "PATCH__TBD__28__",
+ "group": "scripting",
+ "since": "7.0.0",
+ "arity": -2,
+ "container": "FUNCTION",
+ "function": "functionFlushCommand",
+ "command_flags": [
+ "NOSCRIPT",
+ "MAY_REPLICATE"
+ ],
+ "acl_categories": [
+ "SCRIPTING"
+ ],
+ "arguments": [
+ {
+ "name": "async",
+ "type": "oneof",
+ "optional": true,
+ "arguments": [
+ {
+ "name": "async",
+ "type": "pure-token",
+ "token": "ASYNC"
+ },
+ {
+ "name": "sync",
+ "type": "pure-token",
+ "token": "SYNC"
+ }
+ ]
+ }
+ ]
+ }
+}
diff --git a/src/functions.c b/src/functions.c
index 156616423..23843794d 100644
--- a/src/functions.c
+++ b/src/functions.c
@@ -372,6 +372,36 @@ void fcallroCommand(client *c) {
fcallCommandGeneric(c, 1);
}
+void functionFlushCommand(client *c) {
+ if (c->argc > 3) {
+ addReplySubcommandSyntaxError(c);
+ return;
+ }
+ int async = 0;
+ if (c->argc == 3 && !strcasecmp(c->argv[2]->ptr,"sync")) {
+ async = 0;
+ } else if (c->argc == 3 && !strcasecmp(c->argv[2]->ptr,"async")) {
+ async = 1;
+ } else if (c->argc == 2) {
+ async = server.lazyfree_lazy_user_flush ? 1 : 0;
+ } else {
+ addReplyError(c,"FUNCTION FLUSH only supports SYNC|ASYNC option");
+ return;
+ }
+
+ if (async) {
+ functionsCtx *old_f_ctx = functions_ctx;
+ functions_ctx = functionsCtxCreate();
+ freeFunctionsAsync(old_f_ctx);
+ } else {
+ functionsCtxClear(functions_ctx);
+ }
+ /* Indicate that the command changed the data so it will be replicated and
+ * counted as a data change (for persistence configuration) */
+ server.dirty++;
+ addReply(c,shared.ok);
+}
+
void functionHelpCommand(client *c) {
const char *help[] = {
"CREATE <ENGINE NAME> <FUNCTION NAME> [REPLACE] [DESC <FUNCTION DESCRIPTION>] <FUNCTION CODE>",
@@ -398,6 +428,12 @@ void functionHelpCommand(client *c) {
" In addition, returns a list of available engines.",
"KILL",
" Kill the current running function.",
+"FLUSH [ASYNC|SYNC]",
+" Delete all the functions.",
+" When called without the optional mode argument, the behavior is determined by the",
+" lazyfree-lazy-user-flush configuration directive. Valid modes are:",
+" * ASYNC: Asynchronously flush the functions.",
+" * SYNC: Synchronously flush the functions.",
NULL };
addReplyHelp(c, help);
}
@@ -528,6 +564,10 @@ dict* functionsGet() {
return functions_ctx->functions;
}
+size_t functionsLen(functionsCtx *functions_ctx) {
+ return dictSize(functions_ctx->functions);
+}
+
/* Initialize engine data structures.
* Should be called once on server initialization */
int functionsInit() {
diff --git a/src/functions.h b/src/functions.h
index 4a1ec4a24..66147f97c 100644
--- a/src/functions.h
+++ b/src/functions.h
@@ -105,6 +105,7 @@ unsigned long functionsMemoryOverhead();
int functionsLoad(rio *rdb, int ver);
unsigned long functionsNum();
dict* functionsGet();
+size_t functionsLen(functionsCtx *functions_ctx);
functionsCtx* functionsCtxGetCurrent();
functionsCtx* functionsCtxCreate();
void functionsCtxFree(functionsCtx *functions_ctx);
diff --git a/src/lazyfree.c b/src/lazyfree.c
index 6127abe77..1cc521cbd 100644
--- a/src/lazyfree.c
+++ b/src/lazyfree.c
@@ -1,6 +1,7 @@
#include "server.h"
#include "bio.h"
#include "atomicvar.h"
+#include "functions.h"
static redisAtomic size_t lazyfree_objects = 0;
static redisAtomic size_t lazyfreed_objects = 0;
@@ -46,6 +47,15 @@ void lazyFreeLuaScripts(void *args[]) {
atomicIncr(lazyfreed_objects,len);
}
+/* Release the functions ctx. */
+void lazyFreeFunctionsCtx(void *args[]) {
+ functionsCtx *f_ctx = args[0];
+ size_t len = functionsLen(f_ctx);
+ functionsCtxFree(f_ctx);
+ atomicDecr(lazyfree_objects,len);
+ atomicIncr(lazyfreed_objects,len);
+}
+
/* Release replication backlog referencing memory. */
void lazyFreeReplicationBacklogRefMem(void *args[]) {
list *blocks = args[0];
@@ -193,6 +203,16 @@ void freeLuaScriptsAsync(dict *lua_scripts) {
}
}
+/* Free functions ctx, if the functions ctx contains enough functions, free it in async way. */
+void freeFunctionsAsync(functionsCtx *f_ctx) {
+ if (functionsLen(f_ctx) > LAZYFREE_THRESHOLD) {
+ atomicIncr(lazyfree_objects,functionsLen(f_ctx));
+ bioCreateLazyFreeJob(lazyFreeFunctionsCtx,1,f_ctx);
+ } else {
+ functionsCtxFree(f_ctx);
+ }
+}
+
/* Free replication backlog referencing buffer blocks and rax index. */
void freeReplicationBacklogRefMemAsync(list *blocks, rax *index) {
if (listLength(blocks) > LAZYFREE_THRESHOLD ||
diff --git a/src/server.h b/src/server.h
index 10f45323c..a3600e13a 100644
--- a/src/server.h
+++ b/src/server.h
@@ -2905,6 +2905,7 @@ int ldbPendingChildren(void);
sds luaCreateFunction(client *c, robj *body);
void luaLdbLineHook(lua_State *lua, lua_Debug *ar);
void freeLuaScriptsAsync(dict *lua_scripts);
+void freeFunctionsAsync(functionsCtx *f_ctx);
int ldbIsEnabled();
void ldbLog(sds entry);
void ldbLogRedisReply(char *reply);
@@ -3169,6 +3170,7 @@ void functionStatsCommand(client *c);
void functionInfoCommand(client *c);
void functionListCommand(client *c);
void functionHelpCommand(client *c);
+void functionFlushCommand(client *c);
void timeCommand(client *c);
void bitopCommand(client *c);
void bitcountCommand(client *c);
diff --git a/tests/support/server.tcl b/tests/support/server.tcl
index 4c63d7b3a..ee39c8df9 100644
--- a/tests/support/server.tcl
+++ b/tests/support/server.tcl
@@ -343,6 +343,7 @@ proc run_external_server_test {code overrides} {
}
r flushall
+ r function flush
# store overrides
set saved_config {}
diff --git a/tests/test_helper.tcl b/tests/test_helper.tcl
index d1405e3e3..570d9e85f 100644
--- a/tests/test_helper.tcl
+++ b/tests/test_helper.tcl
@@ -63,6 +63,7 @@ set ::all_tests {
unit/pubsub
unit/slowlog
unit/scripting
+ unit/functions
unit/maxmemory
unit/introspection
unit/introspection-2
diff --git a/tests/unit/functions.tcl b/tests/unit/functions.tcl
index 0736a44da..27bb29b72 100644
--- a/tests/unit/functions.tcl
+++ b/tests/unit/functions.tcl
@@ -185,9 +185,34 @@ start_server {tags {"scripting"}} {
after 200 ; # Give some time to Lua to call the hook again...
assert_equal [r ping] "PONG"
}
+
+ test {FUNCTION - test function flush} {
+ r function create lua test REPLACE {local a = 1 while true do a = a + 1 end}
+ assert_match {{name test engine LUA description {}}} [r function list]
+ r function flush
+ assert_match {} [r function list]
+
+ r function create lua test REPLACE {local a = 1 while true do a = a + 1 end}
+ assert_match {{name test engine LUA description {}}} [r function list]
+ r function flush async
+ assert_match {} [r function list]
+
+ r function create lua test REPLACE {local a = 1 while true do a = a + 1 end}
+ assert_match {{name test engine LUA description {}}} [r function list]
+ r function flush sync
+ assert_match {} [r function list]
+ }
+
+ test {FUNCTION - test function wrong argument} {
+ catch {r function flush bad_arg} e
+ assert_match {*only supports SYNC|ASYNC*} $e
+
+ catch {r function flush sync extra_arg} e
+ assert_match {*wrong number of arguments*} $e
+ }
}
-start_server {tags {"scripting repl"}} {
+start_server {tags {"scripting repl external:skip"}} {
start_server {} {
test "Connect a replica to the master instance" {
r -1 slaveof [srv 0 host] [srv 0 port]
@@ -221,6 +246,21 @@ start_server {tags {"scripting repl"}} {
}
}
+ test {FUNCTION - flush is replicated to replica} {
+ r function create LUA test DESCRIPTION {some description} {return 'hello'}
+ wait_for_condition 50 100 {
+ [r -1 function list] eq {{name test engine LUA description {some description}}}
+ } else {
+ fail "Failed waiting for function to replicate to replica"
+ }
+ r function flush
+ wait_for_condition 50 100 {
+ [r -1 function list] eq {}
+ } else {
+ fail "Failed waiting for function to replicate to replica"
+ }
+ }
+
test "Disconnecting the replica from master instance" {
r -1 slaveof no one
# creating a function after disconnect to make sure function